__consumer_offser大量提交写入导致broker负载不均匀

一、broker请求不均匀的异常现象

通过监控系统发现,broker总的messages量和__consumer_offsets单个topic messages数据量在kf19这个节点上比其它节点要高出很多。

consumeroffsets

二、consumer offsets提交方式和写入策略

官方文档中关于__consumer_offsets的介绍如下,
When the offset manager receives an OffsetCommitRequest, it appends the request to a special compacted Kafka topic named __consumer_offsets. The offset manager sends a successful offset commit response to the consumer only after all the replicas of the offsets topic receive the offsets. In case the offsets fail to replicate within a configurable timeout, the offset commit will fail and the consumer may retry the commit after backing off. (This is done automatically by the high-level consumer.) The brokers periodically compact the offsets topic since it only needs to maintain the most recent offset commit per partition. The offset manager also caches the offsets in an in-memory table in order to serve offset fetches quickly.



上面是官方文档关于__consumer_ofsets这个内部topic的解释。在0.9版本之前,consumer offset提交到zookeeper中,如果在大规模集群,大量的zookeeper transactions会导致zk的压力比较大,因而改为提交到__consumer__offsets这个topic中。

提交到kafka中的消息格式如下:

[cs-01,test,0]::[OffsetMetadata[6,NO_METADATA],CommitTime 1499672959153,ExpirationTime 2388705663153]


{
    "topic":"test",
    "partition":0,
    "group":"cs-01",
    "version":2,
    "offset":6,
    "metadata":"",
    "commitTimestamp":1499672959153,
    "expireTimestamp":2388705663153
}

三、找出异常的offsets提交的consumer group id**

__consumer_offset默认50个parititions,通过describe信息发现,kf19上有3个leader partition。

Topic: __consumer_offsets       Partition: 20   Leader: 19      Replicas: 20,34,35,18,19,15     Isr: 19
Topic: __consumer_offsets       Partition: 24   Leader: 19      Replicas: 19,38,39      Isr: 38,39,19
Topic: __consumer_offsets       Partition: 74   Leader: 19      Replicas: 19,39,40      Isr: 39,40,19

对比log segement文件大小,parition 20的数据体积比较大。

$ du -sh /data*/kafka/*  |grep offsets
457M    /data0/kafka/__consumer_offsets-20
8.9M    /data0/kafka/__consumer_offsets-4
66M     /data0/kafka/__consumer_offsets-5
16M     /data0/kafka/__consumer_offsets-53
65M     /data0/kafka/__consumer_offsets-54
26M     /data1/kafka/__consumer_offsets-24
45M     /data1/kafka/__consumer_offsets-74

既然数据是写到topic中,我们可以通过bin/kafka-simple-consumer-shell.sh把提交记录读取下来做统计分析。

需要注意的是,__consumer_offset是内部使用的topic,外部应用程序是没法读取的,需要在config/consumer.properties关闭exclude.internal.topics这个参数。

exclude.internal.topics=false

读取parition 20的offsets数据用作分析。

bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 20 --broker-list 127.0.0.1:9092 \
--formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"  > /tmp/20.offset.txt

提取consumer group id信息,发现wolf-kafka-store-test提交的offsets异常,数量居多。

$ awk -F"," '{print $1}' /tmp/20.offset.txt |sed 's/\[//' | sort | uniq -c |sort  -nk1
    113 topic-motor-task-20180521
    120 magne-topic-mobius-api
    120 topic-bohr-task-20180521
    120 topic-naboo-manager-20180521
    180 magne-topic-durin-console
    200 topic-search-api-20180521
1004808 wolf-kafka-store-test

问题的分析过程主要借鉴文后的两个参考资料文档。

四、consumer offsets写入策略**

1)__consumer_offsets默认50个topic,某个consumer group的offsets会写到一个固定的parition中,计算方式如下,

def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount

2) enable.auto.commit(default true)和 auto.commit.interval.ms(default 5000ms)

3) 问题重现

bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:2181 --topic test \
--from-beginning --consumer-property group.id=20 --consumer-property auto.commit.interval.ms=20


[参考资料]:

https://community.zuora.com/t5/Engineering-Blog/How-We-Triaged-Uneven-CPU-Load-in-our-Kafka-Cluster/ba-p/22016#

https://elang2.github.io/myblog/posts/2017-09-20-Kafak-And-Zookeeper-Offsets.html

此条目发表在kafka分类目录。将固定链接加入收藏夹。

发表评论

电子邮件地址不会被公开。 必填项已用*标注

This site uses Akismet to reduce spam. Learn how your comment data is processed.