kafka扩容节点和partitions迁移

kafka的扩容难点:

1)主要在于增加机器之后,数据需要rebalance到新增的空闲节点,即把partitions迁移到空闲机器上。

kafka提供了bin/kafka-reassign-partitions.sh工具,完成parttition的迁移。

2)kafka的集群的数据量加大,数据rebalance的时间较长。解决办法是把log.retention.hours=1设置一小时(生产参数24小时)。

修改参数之后,重启kakfa节点,kafka会主动purge 1小时之前的log数据。

以下是kafka_0.8.1.1版本kafkka集群扩容操作记录,从16台物理机扩容到24台物理,partition数量由128个增加到192个:

<更新于2016.01.26> kafka支持参数的动态修改,下面单独设置某个topic的retention时间。

bin/kafka-topics.sh --zookeeper zookeeper-003:2181/kafka --alter --topic order_active --config retention.ms=86400000 

参考资料:http://kafka.apache.org/081/documentation.html#basic_ops_modify_topic

1、准备工作:

1) 验证kafka节点是否正常加入集群。

zkCli.sh > ls /kafka/brokers/ids/

2)检查新增机器的zk信息中主机名是否准确,每个broker都要检查

zkCli.sh > get /kafka/brokers/ids/1

2、purge数据,使数据迁移更加快速

1)替换retent.time,只保留最近一个小时的数据。主要是为了方面topic数据快速迁移。

sed -i 's/log.retention.hours=24/log.retention.hours=1/g' /apps/conf/kafka/server.properties

2)关闭kafka

apps/svr/jdk/bin/jps -ml |grep 'kafka.Kafka' | awk '{print $1}' |xargs kill -9

3)验证kafka进程

/apps/svr/jdk/bin/jps -ml |grep 'kafka.Kafka' | awk '{print $1}'

4)启动

/apps/svr/kafka/bin/kafka-server-start.sh -daemon /apps/conf/kafka/server.properties
/apps/svr/jdk/bin/jps -ml |grep 'kafka.Kafka' | awk '{print $1}'

5)观察kafka数据是够已经完成了数据purge

(1)通过server.log查看purge过程

(2)直观的观察data目录是够存在2个小时之前的日志data

3、增加partitions

因为增加节点,物理机器机器更多,需要增加partition的个数。

bin/kafka-topics.sh --zookeeper gd6-chenqun-zookeeper-003.idc.vip.com:2181/kafka --alter --topic all --partitions 192

4、重新分配parttion(reassign partitions)

1)获取所有的topic

/apps/svr/kafka/bin/kafka-topics.sh --list --zookeeper gd6-chenuqn-zookeeper-001.idc.vip.com:2181/kafka

2) reassign partitions**

生成需要迁移的topic partitions信息,broker-list为所有的节点,包括新增节点。

./bin/kafka-reassign-partitions.sh --broker-list "1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24" --topics-to-move-json-file move.json --zookeeper gd6-chenqun-zookeeper-001.idc.vip.com:2181/kafka --generate

其中topics的json文件内容为:

vim move.json
{"topics":
[{"topic": "log"},{"topic": "trace"},{"topic": "titan"}],
"version":1
}

3)使用上一步生成的建议partition json内容进行完成迁移
“Proposed partition reassignment configuration”后面的内容保存到reassign.json文件中
….

bin/kafka-reassign-partitions.sh --broker-list "1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24" --reassignment-json-file reassign.json --zookeeper gd6-chenqun-zookeeper-001.idc.vip.com:2181/kafka --execute

4) 检验partition的迁移状态

bin/kafka-reassign-partitions.sh   --verify  --zookeeper gd6-chenqun-zookeeper-001.idc.vip.com:2181/logviewkafka  
--reassignment-json-file reassign.json  |grep -v  successfully Status of partition reassignment: Reassignment of partition [all,43] is still in progress Reassignment of partition [all,11] is still in progress Reassignment of partition [all,107] is still in progress

5)修改参数,重启kafka

sed -i 's/log.retention.hours=1/log.retention.hours=24/g' /apps/conf/kafka/server.properties

6)验证partition的分布

./bin/kafka-topics.sh --zookeeper gd6-chenqun-zookeeper-003.idc.vip.com:2181/logviewkafka --topic all --describe
此条目发表在kafka分类目录,贴了, 标签。将固定链接加入收藏夹。

发表评论

邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据