Kafka 4 实用系列 005 — 节点间分区迁移

前述

在本章节中,您将学习如何在 Kafka 集群中的节点之间迁移主题分区。向现有集群添加节点时,Kafka 不会自动将任何分区迁移到新节点,而您需要手动迁移分区。这对于节点故障也有效,因为现有分区不会自动移动到剩余节点。

简单理解
Broker 3 节点挂了,Broker 3 节点上的分区(Partition)不会自动移至可用节点 Broker 1、Broker 2 节点,需要用户手动迁移。

分区迁移

Kafka 提供了一个名叫 transition-plan 的脚本 kafka-reassign-partitions.sh,它可以生成、执行和验证迁移计划。
实验 1: 创建一个迁移计划,将 one-topic 主题(Topic)中存在于 Broker 3 节点的分区(Partition)迁移到 Broker 1 和 Broker 2 节点。

# 定义迁移主题:通过 JSON 文件定义需要迁移的主题
[root@kafka-node-001 kafka]# cat > topics-to-move.json << \EOF
{
    "topics": [
        {
            "topic": "one-topic" # 主题名称
        }
    ],
    "version": 1
}
EOF

# 生成迁移计划:将指定主题 one-topic,迁移至 Broker 1 和 Broker 2 节点
[root@kafka-node-001 kafka]# /opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server kafka-node-001:9092 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate
Current partition replica assignment # 当前分区副本分配情况
{"version":1,"partitions":[{"topic":"one-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"one-topic","partition":3,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":5,"replicas":[1,3],"log_dirs":["any","any"]}]}

Proposed partition reassignment configuration # 建议分区重新分配计划(可以看到只有 Broker 1 和 Broker 2 节点)
{"version":1,"partitions":[{"topic":"one-topic","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":1,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":2,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":3,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":4,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}

# 创建迁移计划:将上面建议分区重新分配计划的内容复制至此文件
[root@kafka-node-001 kafka]# cat migration-plan-A.json
{"version":1,"partitions":[{"topic":"one-topic","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":1,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":2,"replicas":[1,2],"log_dirs":["an
y","any"]},{"topic":"one-topic","partition":3,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":4,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}

# 执行迁移:基于迁移计划文件迁移分区至 Broker 1 和 Broker 2 节点
[root@kafka-node-001 kafka]# /opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server kafka-node-001:9092 --reassignment-json-file migration-plan-A.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"one-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"one-topic","partition":3,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":5,"replicas":[1,3],"log_dirs":["any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for one-topic-0,one-topic-1,one-topic-2,one-topic-3,one-topic-4,one-topic-5

# 迁移进度:通过 --verify 参数查看分区迁移进度(节点负载高时,迁移进度会比较慢,状态为 is in progress)
[root@kafka-node-001 kafka]# /opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server kafka-node-001:9092 --reassignment-json-file migration-plan-A.json --verify
Status of partition reassignment:
Reassignment of partition one-topic-0 is completed.
Reassignment of partition one-topic-1 is completed.
Reassignment of partition one-topic-2 is completed.
Reassignment of partition one-topic-3 is completed.
Reassignment of partition one-topic-4 is completed.
Reassignment of partition one-topic-5 is completed.

Clearing broker-level throttles on brokers 1,2,3
Clearing topic-level throttles on topic one-topic

# 验证迁移结果:可以看到副本都在 Broker 1 和 Broker 2 节点上
[root@kafka-node-001 kafka]# /opt/kafka/bin/kafka-topics.sh --describe --bootstrap-server kafka-node-001:9092 --topic one-topic
Topic: one-topic      TopicId: gFMGRZuqRv6y4qXgvdkaFw PartitionCount: 6       ReplicationFactor: 2    Configs: min.insync.replicas=1,segment.bytes=1073741824
        Topic: one-topic      Partition: 0    Leader: 1       Replicas: 1,2   Isr: 1,2        Elr:    LastKnownElr: 
        Topic: one-topic      Partition: 1    Leader: 2       Replicas: 2,1   Isr: 1,2        Elr:    LastKnownElr: 
        Topic: one-topic      Partition: 2    Leader: 1       Replicas: 1,2   Isr: 2,1        Elr:    LastKnownElr: 
        Topic: one-topic      Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1        Elr:    LastKnownElr: 
        Topic: one-topic      Partition: 4    Leader: 1       Replicas: 1,2   Isr: 2,1        Elr:    LastKnownElr: 
        Topic: one-topic      Partition: 5    Leader: 2       Replicas: 2,1   Isr: 1,2        Elr:    LastKnownElr: 

回滚迁移

刚刚将 Broker 3 节点上的分区迁移到了 Broker 1 和 Broker 2 节点,现在想回滚刚刚的操作怎么办?
实验 2: 将 one-topic 主题(Topic)刚刚迁移至 Broker 3 节点的分区(Partition),从 Broker 1 和 Broker 2 节点回滚回 Broker 3 节点。

# 在上面执行迁移的时候,有返回一条消息提示,将上面的 JSON 保存,然后可以用于执行回滚。
# 执行迁移:基于迁移计划文件迁移分区至 Broker 1 和 Broker 2 节点
[root@kafka-node-001 kafka]# /opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server kafka-node-001:9092 --reassignment-json-file migration-plan-A.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"one-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"one-topic","partition":3,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":5,"replicas":[1,3],"log_dirs":["any","any"]}]}

# 上面返回的 JSON 是当前分区的节点分配信息。
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for one-topic-0,one-topic-1,one-topic-2,one-topic-3,one-topic-4,one-topic-5

# 创建回滚计划:将上面迁移前分区节点分配历史信息复制至此文件
[root@kafka-node-001 kafka]# cat rollback-plan.json
{"version":1,"partitions":[{"topic":"one-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"one-topic","partition":3,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":5,"replicas":[1,3],"log_dirs":["any","any"]}]}

# 执行回滚操作
[root@kafka-node-001 kafka]# /opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server kafka-node-001:9092 --reassignment-json-file rollback-plan.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"one-topic","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":1,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":2,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":3,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for one-topic-0,one-topic-1,one-topic-2,one-topic-3,one-topic-4,one-topic-5

# 验证回滚结果:细心的同学,可以对比一下迁移前、迁移后、回滚后的 Leader、Replicas、Isr ID的变化,正常迁移前和回滚后,显示的三者 ID 应该是相同的
[root@kafka-node-001 kafka]# /opt/kafka/bin/kafka-topics.sh --describe --bootstrap-server kafka-node-001:9092 --topic one-topic
Topic: one-topic      TopicId: gFMGRZuqRv6y4qXgvdkaFw PartitionCount: 6       ReplicationFactor: 2    Configs: min.insync.replicas=1,segment.bytes=1073741824
        Topic: one-topic      Partition: 0    Leader: 1       Replicas: 1,2   Isr: 1,2        Elr:    LastKnownElr: 
        Topic: one-topic      Partition: 1    Leader: 2       Replicas: 2,3   Isr: 2,3        Elr:    LastKnownElr: 
        Topic: one-topic      Partition: 2    Leader: 1       Replicas: 3,1   Isr: 1,3        Elr:    LastKnownElr: 
        Topic: one-topic      Partition: 3    Leader: 2       Replicas: 3,2   Isr: 2,3        Elr:    LastKnownElr: 
        Topic: one-topic      Partition: 4    Leader: 2       Replicas: 2,1   Isr: 2,1        Elr:    LastKnownElr: 
        Topic: one-topic      Partition: 5    Leader: 1       Replicas: 1,3   Isr: 1,3        Elr:    LastKnownElr: 

分区重平衡

此方法实现两个功能:

  1. 节点扩容:3 个节点扩容至 5 个,进行分区表重平衡(重新将分区打散至不同 Broker 节点)。
  2. 故障节点恢复:Broker 3 节点异常恢复后,恢复对应节点分区,进行分区重平衡(将故障节点上的数据先进行删除,然后将节点加入现有集群,再进行重平衡操作)。
# 定义迁移主题:通过 JSON 文件定义需要迁移的主题
[root@kafka-node-001 kafka]# cat > topics-to-move.json << \EOF
{
    "topics": [
        {
            "topic": "one-topic" # 主题名称
        }
    ],
    "version": 1
}
EOF

# 生成迁移计划:将指定主题 one-topic,重平衡至 Broker 1、2、3 节点
[root@kafka-node-001 kafka]# /opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server kafka-node-001:9092 --topics-to-move-json-file topics-to-move.json --broker-list "1,2,3" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"one-topic","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":1,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":2,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":3,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":4,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"one-topic","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":1,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"one-topic","partition":2,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":3,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":5,"replicas":[1,3],"log_dirs":["any","any"]}]}

# 创建迁移计划:将上面建议分区重新分配计划的内容复制至此文件
[root@kafka-node-001 kafka]# cat > migration-plan-B.json << \EOF
{"version":1,"partitions":[{"topic":"one-topic","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":1,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"one-topic","partition":2,"replicas":[3,1],"log_dirs":["an
y","any"]},{"topic":"one-topic","partition":3,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":5,"replicas":[1,3],"log_dirs":["any","any"]}]}
EOF

# 重平衡之前,分区都在 Broker 1、2 节点上
[root@kafka-node-001 kafka]# /opt/kafka/bin/kafka-topics.sh --describe --bootstrap-server kafka-node-001:9092 --topic one-topic
Topic: one-topic      TopicId: gFMGRZuqRv6y4qXgvdkaFw PartitionCount: 6       ReplicationFactor: 2    Configs: min.insync.replicas=1,segment.bytes=1073741824
        Topic: one-topic      Partition: 0    Leader: 1       Replicas: 1,2   Isr: 1,2        Elr:    LastKnownElr: 
        Topic: one-topic      Partition: 1    Leader: 2       Replicas: 2,1   Isr: 1,2        Elr:    LastKnownElr: 
        Topic: one-topic      Partition: 2    Leader: 1       Replicas: 1,2   Isr: 2,1        Elr:    LastKnownElr: 
        Topic: one-topic      Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1        Elr:    LastKnownElr: 
        Topic: one-topic      Partition: 4    Leader: 1       Replicas: 1,2   Isr: 2,1        Elr:    LastKnownElr: 
        Topic: one-topic      Partition: 5    Leader: 2       Replicas: 2,1   Isr: 1,2        Elr:    LastKnownElr: 

# 执行迁移:基于迁移计划文件迁移分区至 Broker 1、2、3 节点
[root@kafka-node-001 kafka]# /opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server kafka-node-001:9092 --reassignment-json-file migration-plan-B.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"one-topic","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":1,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":2,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":3,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":4,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for one-topic-0,one-topic-1,one-topic-2,one-topic-3,one-topic-4,one-topic-5

# 验证迁移结果:可以看到副本都在 Broker 1、2、3 节点上
[root@kafka-node-001 kafka]# /opt/kafka/bin/kafka-topics.sh --describe --bootstrap-server kafka-node-001:9092 --topic one-topic
Topic: one-topic      TopicId: gFMGRZuqRv6y4qXgvdkaFw PartitionCount: 6       ReplicationFactor: 2    Configs: min.insync.replicas=1,segment.bytes=1073741824
        Topic: one-topic      Partition: 0    Leader: 1       Replicas: 1,2   Isr: 1,2        Elr:    LastKnownElr: 
        Topic: one-topic      Partition: 1    Leader: 2       Replicas: 2,3   Isr: 2,3        Elr:    LastKnownElr: 
        Topic: one-topic      Partition: 2    Leader: 1       Replicas: 3,1   Isr: 1,3        Elr:    LastKnownElr: 
        Topic: one-topic      Partition: 3    Leader: 2       Replicas: 3,2   Isr: 2,3        Elr:    LastKnownElr: 
        Topic: one-topic      Partition: 4    Leader: 1       Replicas: 2,1   Isr: 2,1        Elr:    LastKnownElr: 
        Topic: one-topic      Partition: 5    Leader: 1       Replicas: 1,3   Isr: 1,3        Elr:    LastKnownElr: 

注意事项

不管是迁移、升级、增删节点的重平衡操作,都需要考虑以下事项:

  1. 回滚时机:只有在您认为当前的迁移操作有问题(例如:导致集群负载过高、迁移卡住、或者迁移计划本身有误)时,才需要执行回滚。
  2. 资源消耗:回滚和迁移一样,都是资源密集型操作。它会占用网络带宽和磁盘 I/O 来进行数据复制。在生产环境中操作需要谨慎。
  3. 保存原始状态的重要性--execute 命令输出的原始分区状态 JSON 非常重要,是回滚的唯一依据。最佳实践是每次执行迁移前都将这个输出重定向保存到一个文件,以备不时之需。
  4. 如果丢失了原始状态 JSON 怎么办? 如果您没有保存那个输出,事情会变得复杂。您就无法一键回滚了。但可以使用 分区重平衡 的方案,重平衡分区至不同节点。当然还可以手动创建回滚 JSON,进行回滚操作。这个难度相同较大,好处在于资源消耗会更小。

参考文献

[1] Automatically migrating data to new machines

Avatar photo

关于 木子

Email: [email protected] 微信:rockylinuxcn QQ: 2306867585
Founder of the Rocky Linux Chinese community, MVP、VMware vExpert、TVP, advocate for cloud native technologies, with over ten years of experience in site reliability engineering (SRE) and the DevOps field. Passionate about Cloud Computing、Microservices、CI&CD、DevOps、Kubernetes, currently dedicated to promoting and implementing Rocky Linux in Chinese-speaking regions.
用一杯咖啡支持我们,我们的每一篇[文档]都经过实际操作和精心打磨,而不是简单地从网上复制粘贴。期间投入了大量心血,只为能够真正帮助到您。
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇