前述
在本章节中,您将学习如何在 Kafka 集群中的节点之间迁移主题分区。向现有集群添加节点时,Kafka 不会自动将任何分区迁移到新节点,而您需要手动迁移分区。这对于节点故障也有效,因为现有分区不会自动移动到剩余节点。
简单理解
Broker 3 节点挂了,Broker 3 节点上的分区(Partition)不会自动移至可用节点 Broker 1、Broker 2 节点,需要用户手动迁移。
分区迁移
Kafka 提供了一个名叫 transition-plan 的脚本 kafka-reassign-partitions.sh,它可以生成、执行和验证迁移计划。
实验 1: 创建一个迁移计划,将 one-topic 主题(Topic)中存在于 Broker 3 节点的分区(Partition)迁移到 Broker 1 和 Broker 2 节点。
# 定义迁移主题:通过 JSON 文件定义需要迁移的主题
[root@kafka-node-001 kafka]# cat > topics-to-move.json << \EOF
{
"topics": [
{
"topic": "one-topic" # 主题名称
}
],
"version": 1
}
EOF
# 生成迁移计划:将指定主题 one-topic,迁移至 Broker 1 和 Broker 2 节点
[root@kafka-node-001 kafka]# /opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server kafka-node-001:9092 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate
Current partition replica assignment # 当前分区副本分配情况
{"version":1,"partitions":[{"topic":"one-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"one-topic","partition":3,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":5,"replicas":[1,3],"log_dirs":["any","any"]}]}
Proposed partition reassignment configuration # 建议分区重新分配计划(可以看到只有 Broker 1 和 Broker 2 节点)
{"version":1,"partitions":[{"topic":"one-topic","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":1,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":2,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":3,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":4,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
# 创建迁移计划:将上面建议分区重新分配计划的内容复制至此文件
[root@kafka-node-001 kafka]# cat migration-plan-A.json
{"version":1,"partitions":[{"topic":"one-topic","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":1,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":2,"replicas":[1,2],"log_dirs":["an
y","any"]},{"topic":"one-topic","partition":3,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":4,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
# 执行迁移:基于迁移计划文件迁移分区至 Broker 1 和 Broker 2 节点
[root@kafka-node-001 kafka]# /opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server kafka-node-001:9092 --reassignment-json-file migration-plan-A.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"one-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"one-topic","partition":3,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":5,"replicas":[1,3],"log_dirs":["any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for one-topic-0,one-topic-1,one-topic-2,one-topic-3,one-topic-4,one-topic-5
# 迁移进度:通过 --verify 参数查看分区迁移进度(节点负载高时,迁移进度会比较慢,状态为 is in progress)
[root@kafka-node-001 kafka]# /opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server kafka-node-001:9092 --reassignment-json-file migration-plan-A.json --verify
Status of partition reassignment:
Reassignment of partition one-topic-0 is completed.
Reassignment of partition one-topic-1 is completed.
Reassignment of partition one-topic-2 is completed.
Reassignment of partition one-topic-3 is completed.
Reassignment of partition one-topic-4 is completed.
Reassignment of partition one-topic-5 is completed.
Clearing broker-level throttles on brokers 1,2,3
Clearing topic-level throttles on topic one-topic
# 验证迁移结果:可以看到副本都在 Broker 1 和 Broker 2 节点上
[root@kafka-node-001 kafka]# /opt/kafka/bin/kafka-topics.sh --describe --bootstrap-server kafka-node-001:9092 --topic one-topic
Topic: one-topic TopicId: gFMGRZuqRv6y4qXgvdkaFw PartitionCount: 6 ReplicationFactor: 2 Configs: min.insync.replicas=1,segment.bytes=1073741824
Topic: one-topic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 Elr: LastKnownElr:
Topic: one-topic Partition: 1 Leader: 2 Replicas: 2,1 Isr: 1,2 Elr: LastKnownElr:
Topic: one-topic Partition: 2 Leader: 1 Replicas: 1,2 Isr: 2,1 Elr: LastKnownElr:
Topic: one-topic Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1 Elr: LastKnownElr:
Topic: one-topic Partition: 4 Leader: 1 Replicas: 1,2 Isr: 2,1 Elr: LastKnownElr:
Topic: one-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 1,2 Elr: LastKnownElr:
回滚迁移
刚刚将 Broker 3 节点上的分区迁移到了 Broker 1 和 Broker 2 节点,现在想回滚刚刚的操作怎么办?
实验 2: 将 one-topic 主题(Topic)刚刚迁移至 Broker 3 节点的分区(Partition),从 Broker 1 和 Broker 2 节点回滚回 Broker 3 节点。
# 在上面执行迁移的时候,有返回一条消息提示,将上面的 JSON 保存,然后可以用于执行回滚。
# 执行迁移:基于迁移计划文件迁移分区至 Broker 1 和 Broker 2 节点
[root@kafka-node-001 kafka]# /opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server kafka-node-001:9092 --reassignment-json-file migration-plan-A.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"one-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"one-topic","partition":3,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":5,"replicas":[1,3],"log_dirs":["any","any"]}]}
# 上面返回的 JSON 是当前分区的节点分配信息。
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for one-topic-0,one-topic-1,one-topic-2,one-topic-3,one-topic-4,one-topic-5
# 创建回滚计划:将上面迁移前分区节点分配历史信息复制至此文件
[root@kafka-node-001 kafka]# cat rollback-plan.json
{"version":1,"partitions":[{"topic":"one-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"one-topic","partition":3,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":5,"replicas":[1,3],"log_dirs":["any","any"]}]}
# 执行回滚操作
[root@kafka-node-001 kafka]# /opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server kafka-node-001:9092 --reassignment-json-file rollback-plan.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"one-topic","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":1,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":2,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":3,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for one-topic-0,one-topic-1,one-topic-2,one-topic-3,one-topic-4,one-topic-5
# 验证回滚结果:细心的同学,可以对比一下迁移前、迁移后、回滚后的 Leader、Replicas、Isr ID的变化,正常迁移前和回滚后,显示的三者 ID 应该是相同的
[root@kafka-node-001 kafka]# /opt/kafka/bin/kafka-topics.sh --describe --bootstrap-server kafka-node-001:9092 --topic one-topic
Topic: one-topic TopicId: gFMGRZuqRv6y4qXgvdkaFw PartitionCount: 6 ReplicationFactor: 2 Configs: min.insync.replicas=1,segment.bytes=1073741824
Topic: one-topic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 Elr: LastKnownElr:
Topic: one-topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Elr: LastKnownElr:
Topic: one-topic Partition: 2 Leader: 1 Replicas: 3,1 Isr: 1,3 Elr: LastKnownElr:
Topic: one-topic Partition: 3 Leader: 2 Replicas: 3,2 Isr: 2,3 Elr: LastKnownElr:
Topic: one-topic Partition: 4 Leader: 2 Replicas: 2,1 Isr: 2,1 Elr: LastKnownElr:
Topic: one-topic Partition: 5 Leader: 1 Replicas: 1,3 Isr: 1,3 Elr: LastKnownElr:
分区重平衡
此方法实现两个功能:
- 节点扩容:3 个节点扩容至 5 个,进行分区表重平衡(重新将分区打散至不同 Broker 节点)。
- 故障节点恢复:Broker 3 节点异常恢复后,恢复对应节点分区,进行分区重平衡(将故障节点上的数据先进行删除,然后将节点加入现有集群,再进行重平衡操作)。
# 定义迁移主题:通过 JSON 文件定义需要迁移的主题
[root@kafka-node-001 kafka]# cat > topics-to-move.json << \EOF
{
"topics": [
{
"topic": "one-topic" # 主题名称
}
],
"version": 1
}
EOF
# 生成迁移计划:将指定主题 one-topic,重平衡至 Broker 1、2、3 节点
[root@kafka-node-001 kafka]# /opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server kafka-node-001:9092 --topics-to-move-json-file topics-to-move.json --broker-list "1,2,3" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"one-topic","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":1,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":2,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":3,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":4,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"one-topic","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":1,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"one-topic","partition":2,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":3,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":5,"replicas":[1,3],"log_dirs":["any","any"]}]}
# 创建迁移计划:将上面建议分区重新分配计划的内容复制至此文件
[root@kafka-node-001 kafka]# cat > migration-plan-B.json << \EOF
{"version":1,"partitions":[{"topic":"one-topic","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":1,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"one-topic","partition":2,"replicas":[3,1],"log_dirs":["an
y","any"]},{"topic":"one-topic","partition":3,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":5,"replicas":[1,3],"log_dirs":["any","any"]}]}
EOF
# 重平衡之前,分区都在 Broker 1、2 节点上
[root@kafka-node-001 kafka]# /opt/kafka/bin/kafka-topics.sh --describe --bootstrap-server kafka-node-001:9092 --topic one-topic
Topic: one-topic TopicId: gFMGRZuqRv6y4qXgvdkaFw PartitionCount: 6 ReplicationFactor: 2 Configs: min.insync.replicas=1,segment.bytes=1073741824
Topic: one-topic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 Elr: LastKnownElr:
Topic: one-topic Partition: 1 Leader: 2 Replicas: 2,1 Isr: 1,2 Elr: LastKnownElr:
Topic: one-topic Partition: 2 Leader: 1 Replicas: 1,2 Isr: 2,1 Elr: LastKnownElr:
Topic: one-topic Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1 Elr: LastKnownElr:
Topic: one-topic Partition: 4 Leader: 1 Replicas: 1,2 Isr: 2,1 Elr: LastKnownElr:
Topic: one-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 1,2 Elr: LastKnownElr:
# 执行迁移:基于迁移计划文件迁移分区至 Broker 1、2、3 节点
[root@kafka-node-001 kafka]# /opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server kafka-node-001:9092 --reassignment-json-file migration-plan-B.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"one-topic","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":1,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":2,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":3,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one-topic","partition":4,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"one-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for one-topic-0,one-topic-1,one-topic-2,one-topic-3,one-topic-4,one-topic-5
# 验证迁移结果:可以看到副本都在 Broker 1、2、3 节点上
[root@kafka-node-001 kafka]# /opt/kafka/bin/kafka-topics.sh --describe --bootstrap-server kafka-node-001:9092 --topic one-topic
Topic: one-topic TopicId: gFMGRZuqRv6y4qXgvdkaFw PartitionCount: 6 ReplicationFactor: 2 Configs: min.insync.replicas=1,segment.bytes=1073741824
Topic: one-topic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 Elr: LastKnownElr:
Topic: one-topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Elr: LastKnownElr:
Topic: one-topic Partition: 2 Leader: 1 Replicas: 3,1 Isr: 1,3 Elr: LastKnownElr:
Topic: one-topic Partition: 3 Leader: 2 Replicas: 3,2 Isr: 2,3 Elr: LastKnownElr:
Topic: one-topic Partition: 4 Leader: 1 Replicas: 2,1 Isr: 2,1 Elr: LastKnownElr:
Topic: one-topic Partition: 5 Leader: 1 Replicas: 1,3 Isr: 1,3 Elr: LastKnownElr:
注意事项
不管是迁移、升级、增删节点的重平衡操作,都需要考虑以下事项:
- 回滚时机:只有在您认为当前的迁移操作有问题(例如:导致集群负载过高、迁移卡住、或者迁移计划本身有误)时,才需要执行回滚。
- 资源消耗:回滚和迁移一样,都是资源密集型操作。它会占用网络带宽和磁盘 I/O 来进行数据复制。在生产环境中操作需要谨慎。
- 保存原始状态的重要性:
--execute命令输出的原始分区状态 JSON 非常重要,是回滚的唯一依据。最佳实践是每次执行迁移前都将这个输出重定向保存到一个文件,以备不时之需。 - 如果丢失了原始状态 JSON 怎么办? 如果您没有保存那个输出,事情会变得复杂。您就无法一键回滚了。但可以使用 分区重平衡 的方案,重平衡分区至不同节点。当然还可以手动创建回滚 JSON,进行回滚操作。这个难度相同较大,好处在于资源消耗会更小。
参考文献
[1] Automatically migrating data to new machines
版权声明:「自由转载-保持署名-非商业性使用-禁止演绎 3.0 国际」(CC BY-NC-ND 3.0)
用一杯咖啡支持我们,我们的每一篇[文档]都经过实际操作和精心打磨,而不是简单地从网上复制粘贴。期间投入了大量心血,只为能够真正帮助到您。
暂无评论










