4.1 主题的管理
主题的管理包括创建主题、查看主题信息、修改主题和删除主题等操作。可以通过 Kafka提供的 kafka-topics.sh 脚本来执行这些操作,这个脚本位于$KAFKA_HOME/bin/目录下,其核心代码仅有一行,具体如下:
可以看到其实质上是调用了kafka.admin.TopicCommand类来执行主题管理的操作。
主题的管理并非只有使用 kafka-topics.sh 脚本这一种方式,我们还可以通过KafkaAdminClient 的方式实现(这种方式实质上是通过发送 CreateTopicsRequest、DeleteTopicsRequest 等请求来实现的,对于 XXXRequest 系列的细节在 6.1 节中会有详细的介绍),甚至我们还可以通过直接操纵日志文件和ZooKeeper节点来实现。下面按照创建主题、查看主题信息、修改主题、删除主题的顺序来介绍其中的操作细节。
4.1.1 创建主题
如果broker端配置参数auto.create.topics.enable设置为true(默认值就是true),那么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为num.partitions (默认值为1)、副本因子为default.replication.factor(默认值为1)的主题。除此之外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会按照配置参数num.partitions和default.replication.factor的值来创建一个相应的主题。很多时候,这种自动创建主题的行为都是非预期的。除非有特殊应用需求,否则不建议将auto.create.topics.enable参数设置为true,这个参数会增加主题的管理与维护的难度。
更加推荐也更加通用的方式是通过kafka-topics.sh脚本来创建主题。在1.3节演示消息的生产与消费时就通过这种方式创建了一个分区数为4、副本因子为3的主题topic-demo。下面通过创建另一个主题topic-create来回顾一下这种创建主题的方式,示例如下:
上面的示例中创建了一个分区数为 4、副本因子为 2 的主题。示例中的环境是一个包含 3个broker节点的集群,每个节点的名称和brokerId的对照关系如下:
在执行完脚本之后,Kafka会在log.dir或log.dirs参数所配置的目录下创建相应的主题分区,默认情况下这个目录为/tmp/kafka-logs/。我们来查看一下node1节点中创建的主题分区,参考如下:
可以看到 node1 节点中创建了 2 个文件夹 topic-create-0 和 topic-create-1,对应主题topic-create的2个分区编号为0和1的分区,命名方式可以概括为<topic>-<partition>。严谨地说,其实<topic>-<partition>这类文件夹对应的不是分区,分区同主题一样是一个逻辑的概念而没有物理上的存在。并且这里我们也只是看到了2个分区,而我们创建的是4个分区,其余2个分区被分配到了node2和node3节点中,参考如下:
三个broker节点一共创建了8个文件夹,这个数字8实质上是分区数4与副本因子2的乘积。每个副本(或者更确切地说应该是日志,副本与日志一一对应)才真正对应了一个命名形式如<topic>-<partition>的文件夹。
主题、分区、副本和 Log(日志)的关系如图 4-1 所示,主题和分区都是提供给上层用户的抽象,而在副本层面或更加确切地说是Log层面才有实际物理上的存在。同一个分区中的多个副本必须分布在不同的broker中,这样才能提供有效的数据冗余。对于示例中的分区数为4、副本因子为2、broker数为3的情况下,按照2、3、3的分区副本个数分配给各个broker是最优的选择。再比如在分区数为3、副本因子为3,并且broker数同样为3的情况下,分配3、3、3的分区副本个数给各个broker是最优的选择,也就是每个broker中都拥有所有分区的一个副本。
图4-1 主题、分区、副本和Log之间的关系
我们不仅可以通过日志文件的根目录来查看集群中各个broker的分区副本的分配情况,还可以通过ZooKeeper客户端来获取。当创建一个主题时会在ZooKeeper的/brokers/topics/目录下创建一个同名的实节点,该节点中记录了该主题的分区副本分配方案。示例如下:
示例数据中的"2":[1,2]表示分区 2 分配了 2 个副本,分别在 brokerId 为 1 和 2 的 broker节点中。
回顾一下1.3 节中提及的知识点:kafka-topics.sh脚本中的 zookeeper、partitions、replication-factor和topic这4个参数分别代表ZooKeeper连接地址、分区数、副本因子和主题名称。另一个 create 参数表示的是创建主题的指令类型,在 kafka-topics.sh 脚本中对应的还有list、describe、alter和delete这4个同级别的指令类型,每个类型所需要的参数也不尽相同。
还可以通过describe指令类型来查看分区副本的分配细节,示例如下:
示例中的Topic和Partition分别表示主题名称和分区号。PartitionCount表示主题中分区的个数,ReplicationFactor表示副本因子,而Configs表示创建或修改主题时指定的参数配置。Leader表示分区的leader副本所对应的brokerId,Isr表示分区的ISR集合,Replicas表示分区的所有的副本分配情况,即AR集合,其中的数字都表示的是brokerId。
使用kafka-topics.sh脚本创建主题的指令格式归纳如下:
到目前为止,创建主题时的分区副本都是按照既定的内部逻辑来进行分配的。kafka-topics.sh脚本中还提供了一个 replica-assignment 参数来手动指定分区副本的分配方案。replica-assignment参数的用法归纳如下:
这种方式根据分区号的数值大小按照从小到大的顺序进行排列,分区与分区之间用逗号“,”隔开,分区内多个副本用冒号“:”隔开。并且在使用replica-assignment参数创建主题时不需要原本必备的partitions和replication-factor这两个参数。
我们可以通过replica-assignment参数来创建一个与主题topic-create相同的分配方案的主题topic-create-same和不同的分配方案的主题topic-create-diff,示例如下:
注意同一个分区内的副本不能有重复,比如指定了0:0,1:1这种,就会报出AdminCommand-FailedException异常,示例如下:
如果分区之间所指定的副本数不同,比如0:1,0,1:0这种,就会报出AdminOperationException异常,示例如下:
当然,类似0:1,,0:1,1:0这种企图跳过一个分区的行为也是不被允许的,示例如下:
在创建主题时我们还可以通过config参数来设置所要创建主题的相关参数,通过这个参数可以覆盖原本的默认配置。在创建主题时可以同时设置多个参数,具体的用法归纳如下:
下面的示例使用了config参数来创建一个主题topic-config:
示例中设置了 cleanup.policy 参数为 compact,以及 max.message.bytes 参数为10000,这两个参数都是主题端的配置,我们再次通过 describe 指令来查看所创建的主题信息:
可以看到 Configs 一栏中包含了创建时所设置的参数。我们还可以通过 ZooKeeper 客户端查看所设置的参数,对应的ZooKeeper节点为/config/topics/[topic],示例如下:
创建主题时对于主题名称的命名方式也很有讲究。首先是不能与已经存在的主题同名,如果创建了同名的主题就会报错。我们尝试创建一个已经存在的主题topic-create,示例如下:
通过上面的示例可以看出,在发生命名冲突时会报出TopicExistsException的异常信息。在kafka-topics.sh 脚本中还提供了一个 if-not-exists 参数,如果在创建主题时带上了这个参数,那么在发生命名冲突时将不做任何处理(既不创建主题,也不报错)。如果没有发生命名冲突,那么和不带if-not-exists参数的行为一样正常创建主题。我们再次尝试创建一个已经存在的主题topic-create,示例如下:
通过上面的示例可以看出,在添加if-not-exists参数之后,并没有像第一次创建主题时的那样出现“Created topic "topic-create".”的提示信息。通过describe指令查看主题中的分区数和副本因子数,还是同第一次创建时的一样分别为 4 和 2,也并没有被覆盖,如此便证实了if-not-exists参数可以在发生命名冲突时不做任何处理。在实际应用中,如果不想在创建主题的时候跳出TopicExistsException的异常信息,不妨试一下这个参数。
kafka-topics.sh脚本在创建主题时还会检测是否包含“.”或“_”字符。为什么要检测这两个字符呢?因为在Kafka的内部做埋点时会根据主题的名称来命名metrics的名称,并且会将点号“.”改成下画线“_”。假设遇到一个名称为“topic.1_2”的主题,还有一个名称为“topic_1.2”的主题,那么最后的metrics的名称都会为“topic_1_2”,这样就发生了名称冲突。举例如下,首先创建一个以“topic.1_2”为名称的主题,提示 WARNING 警告,之后再创建“topic.1_2”时发生InvalidTopicException异常。
注意要点:主题的命名同样不推荐(虽然可以这样做)使用双下画线“__”开头,因为以双下画线开头的主题一般看作Kafka的内部主题,比如__consumer_offsets和__transaction_state。主题的名称必须由大小写字母、数字、点号“.”、连接线“-”、下画线“_”组成,不能为空,不能只有点号“.”,也不能只有双点号“..”,且长度不能超过249。
Kafka从0.10.x版本开始支持指定broker的机架信息(机架的名称)。如果指定了机架信息,则在分区副本分配时会尽可能地让分区副本分配到不同的机架上。指定机架信息是通过broker端参数broker.rack来配置的,比如配置当前broker所在的机架为“RACK1”:
如果一个集群中有部分broker指定了机架信息,并且其余的broker没有指定机架信息,那么在执行kafka-topics.sh脚本创建主题时会报出的AdminOperationException的异常,示例如下:
此时若要成功创建主题,要么将集群中的所有broker都加上机架信息或都去掉机架信息,要么使用disable-rack-aware参数来忽略机架信息,示例如下:
如果集群中的所有broker都有机架信息,那么也可以使用disable-rack-aware参数来忽略机架信息对分区副本的分配影响,有关分区副本的分配细节会在4.1.2节中做详细介绍。
本节开头就提及了 kafka-topics.sh 脚本实质上是调用了 kafka.admin.TopicCommand 类,通过向 TopicCommand 类中传入一些关键参数来实现主题的管理。我们也可以直接调用TopicCommand类中的main()函数来直接管理主题,比如这里创建一个分区数为1、副本因子为1的主题topic-create-api,如代码清单4-1所示。
代码清单4-1 使用TopicCommand创建主题
使用这种方式需要添加相应的Maven依赖:
可以看到这种方式与使用kafka-topics.sh脚本的方式并无太大差别,可以使用这种方式集成到自动化管理系统中来创建相应的主题。当然这种方式也可以适用于对主题的删、改、查等操作的实现,只需修改对应的参数即可。不过更推荐使用4.2节中介绍的KafkaAdminClient来代替这种实现方式。
4.1.2 分区副本的分配
4.1.1节中多处提及了分区副本的分配,读者对此或许有点迷惑,在生产者和消费者中也都有分区分配的概念。生产者的分区分配是指为每条消息指定其所要发往的分区,消费者中的分区分配是指为消费者指定其可以消费消息的分区,而这里的分区分配是指为集群制定创建主题时的分区副本分配方案,即在哪个broker中创建哪些分区的副本。
在创建主题时,如果使用了replica-assignment参数,那么就按照指定的方案来进行分区副本的创建;如果没有使用replica-assignment参数,那么就需要按照内部的逻辑来计算分配方案了。使用kafka-topics.sh脚本创建主题时的内部分配逻辑按照机架信息划分成两种策略:未指定机架信息和指定机架信息。如果集群中所有的 broker 节点都没有配置broker.rack参数,或者使用disable-rack-aware参数来创建主题,那么采用的就是未指定机架信息的分配策略,否则采用的就是指定机架信息的分配策略。
首先看一下未指定机架信息的分配策略,具体的实现涉及代码的逻辑细节,未指定机架信息的分配策略比较容易理解,这里通过源码来逐一进行分析。所对应的具体实现为kafka.admin.AdminUtils.scala文件中的assignReplicasToBrokersRackUnaware()方法,该方法的内容如下:
该方法参数列表中的fixedStartIndex和startPartitionId值是从上游的方法中调用传下来的,都是-1,分别表示第一个副本分配的位置和起始分区编号。assignReplicasToBrokersRackUnaware ()方法的核心是遍历每个分区 partition,然后从 brokerArray (brokerId 的列表)中选取replicationFactor个brokerId分配给这个partition。
该方法首先创建一个可变的Map用来存放该方法将要返回的结果,即分区partition和分配副本的映射关系。由于fixedStartIndex为-1,所以startIndex是一个随机数,用来计算一个起始分配的brokerId,同时又因为startPartitionId为-1,所以currentPartitionId的值为0,可见默认情况下创建主题时总是从编号为0的分区依次轮询进行分配。
nextReplicaShift表示下一次副本分配相对于前一次分配的位移量,从字面上理解有点绕口。举个例子:假设集群中有3个broker节点,对应于代码中的brokerArray,创建的某个主题中有3个副本和6个分区,那么首先从partitionId(partition的编号)为0的分区开始进行分配,假设第一次计算(由rand.nextInt(brokerArray.length)随机产生)得到的nextReplicaShift值为1,第一次随机产生的 startIndex 值为 2,那么 partitionId 为 0 的第一个副本的位置(这里指的是brokerArray的数组下标)firstReplicaIndex=(currentPartitionId+startIndex)%brokerArray.length=(0+2)%3=2,第二个副本的位置为replicaIndex(firstReplicaIndex,nextReplicaShift,j,brokerArray.length)=replicaIndex(2,nextReplicaShift+1,0,3)=?,这里引入了一个新的方法replicaIndex(),不过这个方法很简单,具体如下:
继续计算 replicaIndex(2,nextReplicaShift+1,0,3)=replicaIndex(2,2,0,3)=(2+(1+(2+0)%(3-1)))%3=0。继续计算下一个副本的位置replicaIndex(2,2,1,3)=(2+(1+(2+1)%(3-1)))%3=1。所以partitionId为0的副本分配位置列表为[2,0,1],如果brokerArray正好是从0开始编号的,也正好是顺序不间断的,即brokerArray为[0,1,2],那么当前partitionId为0的副本分配策略为[2,0,1]。如果brokerId不是从0开始的,也不是顺序的(有可能之前集群的其中几个broker下线了),最终的brokerArray为[2,5,8],那么partitionId为0的分区的副本分配策略为[8,2,5]。为了便于说明问题,可以简单假设brokerArray就是[0,1,2]。
同样计算下一个分区,即partitionId为1的副本分配策略。此时nextReplicaShift的值还是2,没有满足自增的条件。这个分区的 firstReplicaIndex=(1+2)%3=0。第二个副本的位置replicaIndex(0,2,0,3)=(0+(1+(2+0)%(3-1)))%3=1,第三个副本的位置replicaIndex(0,2,1,3)=2,最终partitionId为2的分区分配策略为[0,1,2]。
依次类推,更多的分配细节可以参考下面的示例,topic-test2的分区分配策略和上面陈述的一致:
我们无法预先获知startIndex和nextReplicaShift的值,因为都是随机产生的。startIndex和nextReplicaShift的值可以通过最终的分区分配方案来反推,比如上面的topic-test2,第一个分区(即partitionId=0的分区)的第一个副本为2,那么可由2=(0+startIndex)%3推断出startIndex为2。之所以startIndex选择随机产生,是因为这样可以在多个主题的情况下尽可能地均匀分布分区副本,如果这里固定为一个特定值,那么每次的第一个副本都是在这个broker上,进而导致少数几个broker所分配到的分区副本过多而其余broker分配到的分区副本过少,最终导致负载不均衡。尤其是某些主题的副本数和分区数都比较少,甚至都为1的情况下,所有的副本都落到了那个指定的broker上。与此同时,在分配时位移量nextReplicaShift也可以更好地使分区副本分配得更加均匀。
相比较而言,指定机架信息的分配策略比未指定机架信息的分配策略要稍微复杂一些,但主体思想并没相差很多,只是将机架信息作为附加的参考项。假设目前有3个机架rack1、rack2和rack3,Kafka集群中的9个broker点都部署在这3个机架之上,机架与broker节点的对照关系如下:
如果不考虑机架信息,那么对照assignReplicasToBrokersRackUnaware()方法里的brokerArray变量的值为[0,1,2,3,4,5 6,7,8]。指定基架信息的assignReplicasToBrokersRackAware()方法里的brokerArray的值在这里就会被转换为[0,3,6,1,4,7,2,5,8],显而易见,这是轮询各个机架而产生的结果,如此新的brokerArray(确切地说是arrangedBrokerList)中包含了简单的机架分配信息。之后的步骤也和assignReplicasToBrokersRackUnaware()方法类似,同样包含startIndex、currentPartiionId、nextReplicaShift 的概念,循环为每一个分区分配副本。分配副本时,除了处理第一个副本,其余的也调用 replicaIndex()方法来获得一个 broker,但这里和assignReplicasToBrokersRackUnaware()不同的是,这里不是简单地将这个broker添加到当前分区的副本列表之中,还要经过一层筛选,满足以下任意一个条件的broker不能被添加到当前分区的副本列表之中:
· 如果此broker所在的机架中已经存在一个broker拥有该分区的副本,并且还有其他的机架中没有任何一个broker拥有该分区的副本。
· 如果此broker中已经拥有该分区的副本,并且还有其他broker中没有该分区的副本。
当创建一个主题时,无论通过kafka-topics.sh脚本,还是通过其他方式(比如4.2节中介绍的KafkaAdminClient)创建主题时,实质上是在ZooKeeper中的/brokers/topics节点下创建与该主题对应的子节点并写入分区副本分配方案,并且在/config/topics/节点下创建与该主题对应的子节点并写入主题相关的配置信息(这个步骤可以省略不执行)。而Kafka创建主题的实质性动作是交由控制器异步去完成的,有关控制器的更多细节可以参考 6.4 节的相关内容。
知道了 kafka-topics.sh 脚本的实质之后,我们可以直接使用 ZooKeeper 的客户端在/brokers/topics节点下创建相应的主题节点并写入预先设定好的分配方案,这样就可以创建一个新的主题了。这种创建主题的方式还可以绕过一些原本使用kafka-topics.sh脚本创建主题时的一些限制,比如分区的序号可以不用从0开始连续累加了。首先我们通过ZooKeeper客户端创建一个除了与主题topic-create名称不同其余都相同的主题topic-create-zk,示例如下:
通过查看主题topic-create-zk的分配情况,可以看到与主题 topic-create 的信息没有什么差别。
我们再创建一个另类的主题,分配情况和主题 topic-create 一样,唯独分区号已经与主题topic-create-special大相径庭,示例如下:
可以看到分区号为10、21、33和40,而通过单纯地使用kafka-topics.sh脚本是无法实现的。不过这种方式也只是一些实战方面上的技巧,笔者还是建议使用更加正统的kafka-topics.sh脚本或KafkaAdminClient来管理相应的主题。
4.1.3 查看主题
4.1.1节中提及了kafka-topics.sh脚本有5种指令类型:create、list、describe、alter和delete。其中list和describe指令可以用来方便地查看主题信息,在前面的内容中我们已经接触过了describe指令的用法,本节会对其做更细致的讲述。
通过list指令可以查看当前所有可用的主题,示例如下:
前面的章节我们都是通过 describe 指令来查看单个主题信息的,如果不使用--topic指定主题,则会展示出所有主题的详细信息。--topic还支持指定多个主题,示例如下:
在使用 describe 指令查看主题信息时还可以额外指定 topics-with-overrides、under-replicated-partitions和unavailable-partitions这三个参数来增加一些附加功能。
增加topics-with-overrides参数可以找出所有包含覆盖配置的主题,它只会列出包含了与集群不一样配置的主题。注意使用topics-with-overrides参数时只显示原本只使用describe指令的第一行信息,参考示例如下:
under-replicated-partitions和unavailable-partitions参数都可以找出有问题的分区。通过 under-replicated-partitions 参数可以找出所有包含失效副本的分区。包含失效副本的分区可能正在进行同步操作,也有可能同步发生异常,此时分区的ISR集合小于 AR 集合。对于通过该参数查询到的分区要重点监控,因为这很可能意味着集群中的某个broker已经失效或同步效率降低等。有关失效副本的更多细节可以参阅8.1.1节。
举个例子,参照主题topic-create的环境,我们将集群中的node2节点下线,之后再通过这个参数来查看topic-create的信息,参考如下:
我们再将node2节点恢复,执行同样的命令,可以看到没有任何信息显示:
通过 unavailable-partitions 参数可以查看主题中没有 leader 副本的分区,这些分区已经处于离线状态,对于外界的生产者和消费者来说处于不可用的状态。
举个例子,参考主题topic-create的环境,我们将集群中的node2和node3节点下线,之后再通过这个参数来查看topic-create的信息,参考如下:
我们再将node2和node3恢复,执行同样的命令,可以看到没有任何信息:
4.1.4 修改主题
当一个主题被创建之后,依然允许我们对其做一定的修改,比如修改分区个数、修改配置等,这个修改的功能就是由kafka-topics.sh脚本中的alter指令提供的。
我们首先来看如何增加主题的分区数。以前面的主题topic-config为例,当前分区数为1,修改为3,示例如下:
注意上面提示的告警信息:当主题中的消息包含key时(即key不为null),根据key计算分区的行为就会受到影响。当topic-config的分区数为1时,不管消息的key为何值,消息都会发往这一个分区;当分区数增加到3时,就会根据消息的key来计算分区号,原本发往分区0的消息现在有可能会发往分区1或分区2。如此还会影响既定消息的顺序,所以在增加分区数时一定要三思而后行。对于基于key计算的主题而言,建议在一开始就设置好分区数量,避免以后对其进行调整。
目前Kafka只支持增加分区数而不支持减少分区数。比如我们再将主题topic-config的分区数修改为1,就会报出InvalidPartitionException的异常,示例如下:
为什么不支持减少分区?
按照Kafka现有的代码逻辑,此功能完全可以实现,不过也会使代码的复杂度急剧增大。实现此功能需要考虑的因素很多,比如删除的分区中的消息该如何处理?如果随着分区一起消失则消息的可靠性得不到保障;如果需要保留则又需要考虑如何保留。直接存储到现有分区的尾部,消息的时间戳就不会递增,如此对于Spark、Flink这类需要消息时间戳(事件时间)的组件将会受到影响;如果分散插入现有的分区,那么在消息量很大的时候,内部的数据复制会占用很大的资源,而且在复制期间,此主题的可用性又如何得到保障?与此同时,顺序性问题、事务性问题,以及分区和副本的状态机切换问题都是不得不面对的。反观这个功能的收益点却是很低的,如果真的需要实现此类功能,则完全可以重新创建一个分区数较小的主题,然后将现有主题中的消息按照既定的逻辑复制过去即可。
在创建主题时有一个if-not-exists参数来忽略一些异常,在这里也有对应的参数,如果所要修改的主题不存在,可以通过 if-exists 参数来忽略异常。下面修改一个不存在的主题topic-unknown的分区,会报出错误信息“Topic topic-unknown does not exist”,示例如下:
除了修改分区数,我们还可以使用kafka-topics.sh脚本的alter指令来变更主题的配置。在创建主题的时候我们可以通过config参数来设置所要创建主题的相关参数,通过这个参数可以覆盖原本的默认配置。在创建完主题之后,我们还可以通过alter指令配合config参数增加或修改一些配置以覆盖它们配置原有的值。
下面的示例中演示了将主题topic-config的max.message.bytes配置值从10000修改为20000,示例如下:
我们再次覆盖主题topic-config的另一个配置segment.bytes(看上去相当于增加动作),示例如下:
我们可以通过delete-config参数来删除之前覆盖的配置,使其恢复原有的默认值。下面的示例将主题topic-config中所有修改过的3个配置都删除:
注意到在变更(增、删、改)配置的操作执行之后都会提示一段告警信息,指明了使用kafka-topics.sh脚本的alter指令来变更主题配置的功能已经过时(deprecated),将在未来的版本中删除,并且推荐使用kafka-configs.sh脚本来实现相关功能。
4.1.5 配置管理
kafka-configs.sh 脚本是专门用来对配置进行操作的,这里的操作是指在运行状态下修改原有的配置,如此可以达到动态变更的目的。kafka-configs.sh脚本包含变更配置alter和查看配置describe这两种指令类型。同使用kafka-topics.sh脚本变更配置的原则一样,增、删、改的行为都可以看作变更操作,不过kafka-configs.sh脚本不仅可以支持操作主题相关的配置,还可以支持操作broker、用户和客户端这3个类型的配置。
kafka-configs.sh脚本使用entity-type参数来指定操作配置的类型,并且使用entity-name参数来指定操作配置的名称。比如查看主题topic-config的配置可以按如下方式执行:
--describe指定了查看配置的指令动作,--entity-type指定了查看配置的实体类型,--entity-name指定了查看配置的实体名称。entity-type只可以配置4个值:topics、brokers、clients和users,entity-type与entity-name的对应关系如表4-1所示。
表4-1 entity-type和entity-name的对应关系
使用alter指令变更配置时,需要配合add-config和delete-config这两个参数一起使用。add-config参数用来实现配置的增、改,即覆盖原有的配置;delete-config参数用来实现配置的删,即删除被覆盖的配置以恢复默认值。
下面的示例演示了 add-config 参数的用法,覆盖了主题 topic-config 的两个配置cleanup.policy和max.message.bytes(示例执行之前主题topic-config无任何被覆盖的配置):
上面示例中还使用了两种方式来查看主题topic-config中配置信息,注意比较这两者之间的差别。
使用delete-config参数删除配置时,同add-config参数一样支持多个配置的操作,多个配置之间用逗号“,”分隔,下面的示例中演示了如何删除上面刚刚增加的主题配置:
使用kafka-configs.sh脚本来变更(alter)配置时,会在ZooKeeper中创建一个命名形式为/config/<entity-type>/<entity-name>的节点,并将变更的配置写入这个节点,比如对于主题topic-config而言,对应的节点名称为/config/topics/topic-config,节点中的数据内容为:
可以推导出节点内容的数据格式为:
其中property-name代表属性名,property-value代表属性值。增加配置实际上是往节点内容中添加属性的键值对,修改配置是在节点内容中修改相应属性的属性值,删除配置是删除相应的属性键值对。
变更配置时还会在ZooKeeper中的/config/changes/节点下创建一个以“config_change_”为前缀的持久顺序节点(PERSISTENT_SEQUENTIAL),节点命名形式可以归纳为/config/changes/config_change_<seqNo>。比如示例中的主题topic-config与此对应的节点名称和节点内容如下:
seqNo是一个单调递增的10位数字的字符串,不足位则用0补齐。
查看(describe)配置时,就是从/config/<entity-type>/<entity-name>节点中获取相应的数据内容。如果使用 kafka-configs.sh 脚本查看配置信息时没有指定entity-name参数的值,则会查看entity-type所对应的所有配置信息。示例如下:
4.1.6 主题端参数
与主题相关的所有配置参数在 broker 层面都有对应参数,比如主题端参数 cleanup.policy对应broker层面的log.cleanup.policy。如果没有修改过主题的任何配置参数,那么就会使用broker端的对应参数作为其默认值。可以在创建主题时覆盖相应参数的默认值,也可以在创建完主题之后变更相应参数的默认值。比如在创建主题的时候没有指定cleanup.policy 参数的值,那么就使用 log.cleanup.policy 参数所配置的值作为cleanup.policy的值。
与主题相关的参数也有很多,由于篇幅限制,在前面的配置变更的示例中难以一一列出所有的参数,但是从配置变更的角度而言,其操作方式都是一样的。为了便于读者查阅,表 4-2列出了主题端参数与broker端参数的对照关系。
表4-2 主题端参数与broker端参数的对照关系
续表
续表
4.1.7 删除主题
如果确定不再使用一个主题,那么最好的方式是将其删除,这样可以释放一些资源,比如磁盘、文件句柄等。kafka-topics.sh脚本中的delete指令就可以用来删除主题,比如删除一个主题topic-delete:
可以看到在执行完删除命令之后会有相关的提示信息,这个提示信息和broker端配置参数delete.topic.enable 有关。必须将delete.topic.enable参数配置为true才能够删除主题,这个参数的默认值就是true,如果配置为false,那么删除主题的操作将会被忽略。在实际生产环境中,建议将这个参数的值设置为true。
如果要删除的主题是 Kafka 的内部主题,那么删除时就会报错。截至 Kafka 2.0.0,Kafka的内部一共包含2个主题,分别为__consumer_offsets和__transaction_state。下面的示例中尝试删除内部主题__consumer_offsets:
尝试删除一个不存在的主题也会报错。比如下面的示例中尝试删除一个不存在的主题topic-unknown:
这里同alter指令一样,也可以通过if-exists参数来忽略异常,参考如下:
使用kafka-topics.sh脚本删除主题的行为本质上只是在ZooKeeper中的/admin/delete_topics 路径下创建一个与待删除主题同名的节点,以此标记该主题为待删除的状态。与创建主题相同的是,真正删除主题的动作也是由Kafka的控制器负责完成的。
了解这一原理之后,我们可以直接通过ZooKeeper的客户端来删除主题。下面示例中使用ZooKeeper客户端zkCli.sh来删除主题topic-delete:
我们还可以通过手动的方式来删除主题。主题中的元数据存储在 ZooKeeper 中的/brokers/topics 和/config/topics 路径下,主题中的消息数据存储在 log.dir 或log.dirs配置的路径下,我们只需要手动删除这些地方的内容即可。下面的示例中演示了如何删除主题topic-delete,总共分3个步骤,第一步和第二步的顺序可以互换。
第一步,删除ZooKeeper中的节点/config/topics/topic-delete。
第二步,删除ZooKeeper中的节点/brokers/topics/topic-delete及其子节点。
第三步,删除集群中所有与主题topic-delete有关的文件。
注意,删除主题是一个不可逆的操作。一旦删除之后,与其相关的所有消息数据会被全部删除,所以在执行这一操作的时候也要三思而后行。
介绍到这里,基本上kafka-topics.sh脚本的使用也就讲完了,为了方便读者查阅,表4-3中列出了所有kafka-topics.sh脚本中的参数。读者也可以通过执行无任何参数的kafka-topics.sh脚本,或者执行kafka-topics.sh-help来查看帮助信息。
表4-3 kafka-topics.sh脚本中的参数
续表