kafka的扩容难点:
1)主要在于增加机器之后,数据需要rebalance到新增的空闲节点,即把partitions迁移到空闲机器上。
kafka提供了bin/kafka-reassign-partitions.sh工具,完成parttition的迁移。
2)kafka的集群的数据量加大,数据rebalance的时间较长。解决办法是把log.retention.hours=1设置一小时(生产参数24小时)。
修改参数之后,重启kakfa节点,kafka会主动purge 1小时之前的log数据。
以下是kafka_0.8.1.1版本kafkka集群扩容操作记录,从16台物理机扩容到24台物理,partition数量由128个增加到192个:
<更新于2016.01.26> kafka支持参数的动态修改,下面单独设置某个topic的retention时间。
bin/kafka-topics.sh --zookeeper zookeeper-003:2181/kafka --alter --topic order_active --config retention.ms=86400000
参考资料:http://kafka.apache.org/081/documentation.html#basic_ops_modify_topic
1、准备工作:
1) 验证kafka节点是否正常加入集群。
zkCli.sh > ls /kafka/brokers/ids/
2)检查新增机器的zk信息中主机名是否准确,每个broker都要检查
zkCli.sh > get /kafka/brokers/ids/1
2、purge数据,使数据迁移更加快速
1)替换retent.time,只保留最近一个小时的数据。主要是为了方面topic数据快速迁移。
sed -i 's/log.retention.hours=24/log.retention.hours=1/g' /apps/conf/kafka/server.properties
2)关闭kafka
apps/svr/jdk/bin/jps -ml |grep 'kafka.Kafka' | awk '{print $1}' |xargs kill -9
3)验证kafka进程
/apps/svr/jdk/bin/jps -ml |grep 'kafka.Kafka' | awk '{print $1}'
4)启动
/apps/svr/kafka/bin/kafka-server-start.sh -daemon /apps/conf/kafka/server.properties
/apps/svr/jdk/bin/jps -ml |grep 'kafka.Kafka' | awk '{print $1}'
5)观察kafka数据是够已经完成了数据purge
(1)通过server.log查看purge过程
(2)直观的观察data目录是够存在2个小时之前的日志data
3、增加partitions
因为增加节点,物理机器机器更多,需要增加partition的个数。
bin/kafka-topics.sh --zookeeper gd6-chenqun-zookeeper-003.idc.vip.com:2181/kafka --alter --topic all --partitions 192
4、重新分配parttion(reassign partitions)
1)获取所有的topic
/apps/svr/kafka/bin/kafka-topics.sh --list --zookeeper gd6-chenuqn-zookeeper-001.idc.vip.com:2181/kafka
2) reassign partitions**
生成需要迁移的topic partitions信息,broker-list为所有的节点,包括新增节点。
./bin/kafka-reassign-partitions.sh --broker-list "1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24" --topics-to-move-json-file move.json --zookeeper gd6-chenqun-zookeeper-001.idc.vip.com:2181/kafka --generate
其中topics的json文件内容为:
vim move.json
{"topics":
[{"topic": "log"},{"topic": "trace"},{"topic": "titan"}],
"version":1
}
3)使用上一步生成的建议partition json内容进行完成迁移
“Proposed partition reassignment configuration”后面的内容保存到reassign.json文件中
….
bin/kafka-reassign-partitions.sh --broker-list "1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24" --reassignment-json-file reassign.json --zookeeper gd6-chenqun-zookeeper-001.idc.vip.com:2181/kafka --execute
4) 检验partition的迁移状态
bin/kafka-reassign-partitions.sh --verify --zookeeper gd6-chenqun-zookeeper-001.idc.vip.com:2181/logviewkafka
--reassignment-json-file reassign.json |grep -v successfully Status of partition reassignment: Reassignment of partition [all,43] is still in progress Reassignment of partition [all,11] is still in progress Reassignment of partition [all,107] is still in progress
5)修改参数,重启kafka
sed -i 's/log.retention.hours=1/log.retention.hours=24/g' /apps/conf/kafka/server.properties
6)验证partition的分布
./bin/kafka-topics.sh --zookeeper gd6-chenqun-zookeeper-003.idc.vip.com:2181/logviewkafka --topic all --describe