极客时间已完结课程限时免费阅读

22 | 消费者组消费进度监控都怎么实现?

22 | 消费者组消费进度监控都怎么实现?-极客时间

22 | 消费者组消费进度监控都怎么实现?

讲述:胡夕

时长12:54大小11.81M

你好,我是胡夕。今天我要跟你分享的主题是:消费者组消费进度监控如何实现。
对于 Kafka 消费者来说,最重要的事情就是监控它们的消费进度了,或者说是监控它们消费的滞后程度。这个滞后程度有个专门的名称:消费者 Lag 或 Consumer Lag。
所谓滞后程度,就是指消费者当前落后于生产者的程度。比方说,Kafka 生产者向某主题成功生产了 100 万条消息,你的消费者当前消费了 80 万条消息,那么我们就说你的消费者滞后了 20 万条消息,即 Lag 等于 20 万。
通常来说,Lag 的单位是消息数,而且我们一般是在主题这个级别上讨论 Lag 的,但实际上,Kafka 监控 Lag 的层级是在分区上的。如果要计算主题级别的,你需要手动汇总所有主题分区的 Lag,将它们累加起来,合并成最终的 Lag 值。
我们刚刚说过,对消费者而言,Lag 应该算是最最重要的监控指标了。它直接反映了一个消费者的运行情况。一个正常工作的消费者,它的 Lag 值应该很小,甚至是接近于 0 的,这表示该消费者能够及时地消费生产者生产出来的消息,滞后程度很小。反之,如果一个消费者 Lag 值很大,通常就表明它无法跟上生产者的速度,最终 Lag 会越来越大,从而拖慢下游消息的处理速度。
更可怕的是,由于消费者的速度无法匹及生产者的速度,极有可能导致它消费的数据已经不在操作系统的页缓存中了。这样的话,消费者就不得不从磁盘上读取它们,这就进一步拉大了与生产者的差距,进而出现马太效应,即那些 Lag 原本就很大的消费者会越来越慢,Lag 也会越来越大。
鉴于这些原因,你在实际业务场景中必须时刻关注消费者的消费进度。一旦出现 Lag 逐步增加的趋势,一定要定位问题,及时处理,避免造成业务损失。
既然消费进度这么重要,我们应该怎么监控它呢?简单来说,有 3 种方法。
使用 Kafka 自带的命令行工具 kafka-consumer-groups 脚本。
使用 Kafka Java Consumer API 编程。
使用 Kafka 自带的 JMX 监控指标。
接下来,我们分别来讨论下这 3 种方法。

Kafka 自带命令

我们先来了解下第一种方法:使用 Kafka 自带的命令行工具 bin/kafka-consumer-groups.sh(bat)。kafka-consumer-groups 脚本是 Kafka 为我们提供的最直接的监控消费者消费进度的工具。当然,除了监控 Lag 之外,它还有其他的功能。今天,我们主要讨论如何使用它来监控 Lag。
如果只看名字,你可能会以为它只是操作和管理消费者组的。实际上,它也能够监控独立消费者(Standalone Consumer)的 Lag。我们之前说过,独立消费者就是没有使用消费者组机制的消费者程序。和消费者组相同的是,它们也要配置 group.id 参数值,但和消费者组调用 KafkaConsumer.subscribe() 不同的是,独立消费者调用 KafkaConsumer.assign() 方法直接消费指定分区。今天的重点不是要学习独立消费者,你只需要了解接下来我们讨论的所有内容都适用于独立消费者就够了。
使用 kafka-consumer-groups 脚本很简单。该脚本位于 Kafka 安装目录的 bin 子目录下,我们可以通过下面的命令来查看某个给定消费者的 Lag 值:
$ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息> --describe --group <group名称>
Kafka 连接信息就是 < 主机名:端口 > 对,而 group 名称就是你的消费者程序中设置的 group.id 值。我举个实际的例子来说明具体的用法,请看下面这张图的输出:
在运行命令时,我指定了 Kafka 集群的连接信息,即 localhost:9092。另外,我还设置了要查询的消费者组名:testgroup。kafka-consumer-groups 脚本的输出信息很丰富。首先,它会按照消费者组订阅主题的分区进行展示,每个分区一行数据;其次,除了主题、分区等信息外,它会汇报每个分区当前最新生产的消息的位移值(即 LOG-END-OFFSET 列值)、该消费者组当前最新消费消息的位移值(即 CURRENT-OFFSET 值)、LAG 值(前两者的差值)、消费者实例 ID、消费者连接 Broker 的主机名以及消费者的 CLIENT-ID 信息。
毫无疑问,在这些数据中,我们最关心的当属 LAG 列的值了,图中每个分区的 LAG 值大约都是 60 多万,这表明,在我的这个测试中,消费者组远远落后于生产者的进度。理想情况下,我们希望该列所有值都是 0,因为这才表明我的消费者完全没有任何滞后。
有的时候,你运行这个脚本可能会出现下面这种情况,如下图所示:
简单比较一下,我们很容易发现它和前面那张图输出的区别,即 CONSUMER-ID、HOST 和 CLIENT-ID 列没有值!如果碰到这种情况,你不用惊慌,这是因为我们运行 kafka-consumer-groups 脚本时没有启动消费者程序。请注意我标为橙色的文字,它显式地告诉我们,当前消费者组没有任何 active 成员,即没有启动任何消费者实例。虽然这些列没有值,但 LAG 列依然是有效的,它依然能够正确地计算出此消费者组的 Lag 值。
除了上面这三列没有值的情形,还可能出现的一种情况是该命令压根不返回任何结果。此时,你也不用惊慌,这是因为你使用的 Kafka 版本比较老,kafka-consumer-groups 脚本还不支持查询非 active 消费者组。一旦碰到这个问题,你可以选择升级你的 Kafka 版本,也可以采用我接下来说的其他方法来查询。

Kafka Java Consumer API

很多时候,你可能对运行命令行工具查询 Lag 这种方式并不满意,而是希望用程序的方式自动化监控。幸运的是,社区的确为我们提供了这样的方法。这就是我们今天要讲的第二种方法。
简单来说,社区提供的 Java Consumer API 分别提供了查询当前分区最新消息位移和消费者组最新消费消息位移两组方法,我们使用它们就能计算出对应的 Lag。
下面这段代码展示了如何利用 Consumer 端 API 监控给定消费者组的 Lag 值:
public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (AdminClient client = AdminClient.create(props)) {
ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
try {
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 处理中断异常
// ...
return Collections.emptyMap();
} catch (ExecutionException e) {
// 处理ExecutionException
// ...
return Collections.emptyMap();
} catch (TimeoutException e) {
throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
}
}
}
你不用完全了解上面这段代码每一行的具体含义,只需要记住 3 处地方即可:第 1 处是调用 AdminClient.listConsumerGroupOffsets 方法获取给定消费者组的最新消费消息的位移;第 2 处则是获取订阅分区的最新消息位移;最后 1 处就是执行相应的减法操作,获取 Lag 值并封装进一个 Map 对象。
我把这段代码送给你,你可以将 lagOf 方法直接应用于你的生产环境,以实现程序化监控消费者 Lag 的目的。不过请注意,这段代码只适用于 Kafka 2.0.0 及以上的版本,2.0.0 之前的版本中没有 AdminClient.listConsumerGroupOffsets 方法。

Kafka JMX 监控指标

上面这两种方式,都可以很方便地查询到给定消费者组的 Lag 信息。但在很多实际监控场景中,我们借助的往往是现成的监控框架。如果是这种情况,以上这两种办法就不怎么管用了,因为它们都不能集成进已有的监控框架中,如 Zabbix 或 Grafana。下面我们就来看第三种方法,使用 Kafka 默认提供的 JMX 监控指标来监控消费者的 Lag 值。
当前,Kafka 消费者提供了一个名为 kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”的 JMX 指标,里面有很多属性。和我们今天所讲内容相关的有两组属性:records-lag-max 和 records-lead-min,它们分别表示此消费者在测试窗口时间内曾经达到的最大的 Lag 值和最小的 Lead 值。
Lag 值的含义我们已经反复讲过了,我就不再重复了。这里的 Lead 值是指消费者最新消费消息的位移与分区当前第一条消息位移的差值。很显然,Lag 和 Lead 是一体的两个方面:Lag 越大的话,Lead 就越小,反之也是同理
你可能会问,为什么要引入 Lead 呢?我只监控 Lag 不就行了吗?这里提 Lead 的原因就在于这部分功能是我实现的。开个玩笑,其实社区引入 Lead 的原因是,只看 Lag 的话,我们也许不能及时意识到可能出现的严重问题。
试想一下,监控到 Lag 越来越大,可能只会给你一个感受,那就是消费者程序变得越来越慢了,至少是追不上生产者程序了,除此之外,你可能什么都不会做。毕竟,有时候这也是能够接受的。但反过来,一旦你监测到 Lead 越来越小,甚至是快接近于 0 了,你就一定要小心了,这可能预示着消费者端要丢消息了。
为什么?我们知道 Kafka 的消息是有留存时间设置的,默认是 1 周,也就是说 Kafka 默认删除 1 周前的数据。倘若你的消费者程序足够慢,慢到它要消费的数据快被 Kafka 删除了,这时你就必须立即处理,否则一定会出现消息被删除,从而导致消费者程序重新调整位移值的情形。这可能产生两个后果:一个是消费者从头消费一遍数据,另一个是消费者从最新的消息位移处开始消费,之前没来得及消费的消息全部被跳过了,从而造成丢消息的假象。
这两种情形都是不可忍受的,因此必须有一个 JMX 指标,清晰地表征这种情形,这就是引入 Lead 指标的原因。所以,Lag 值从 100 万增加到 200 万这件事情,远不如 Lead 值从 200 减少到 100 这件事来得重要。在实际生产环境中,请你一定要同时监控 Lag 值和 Lead 值。当然了,这个 lead JMX 指标的确也是我开发的,这一点倒是事实。
接下来,我给出一张使用 JConsole 工具监控此 JMX 指标的截图。从这张图片中,我们可以看到,client-id 为 consumer-1 的消费者在给定的测量周期内最大的 Lag 值为 714202,最小的 Lead 值是 83,这说明此消费者有很大的消费滞后性。
Kafka 消费者还在分区级别提供了额外的 JMX 指标,用于单独监控分区级别的 Lag 和 Lead 值。JMX 名称为:kafka.consumer:type=consumer-fetch-manager-metrics,partition=“{partition}”,topic=“{topic}”,client-id=“{client-id}”。
在我们的例子中,client-id 还是 consumer-1,主题和分区分别是 test 和 0。下图展示出了分区级别的 JMX 指标:
分区级别的 JMX 指标中多了 records-lag-avg 和 records-lead-avg 两个属性,可以计算平均的 Lag 值和 Lead 值。在实际场景中,我们会更多地使用这两个 JMX 指标。

小结

我今天完整地介绍了监控消费者组以及独立消费者程序消费进度的 3 种方法。从使用便捷性上看,应该说方法 1 是最简单的,我们直接运行 Kafka 自带的命令行工具即可。方法 2 使用 Consumer API 组合计算 Lag,也是一种有效的方法,重要的是它能集成进很多企业级的自动化监控工具中。不过,集成性最好的还是方法 3,直接将 JMX 监控指标配置到主流的监控框架就可以了。
在真实的线上环境中,我建议你优先考虑方法 3,同时将方法 1 和方法 2 作为备选,装进你自己的工具箱中,随时取出来应对各种实际场景。

开放讨论

请说说你对这三种方法的看法。另外,在真实的业务场景中,你是怎么监控消费者进度的呢?
欢迎写下你的思考和答案,我们一起讨论。如果你觉得有所收获,也欢迎把文章分享给你的朋友。
分享给需要的人,Ta购买本课程,你将得20
生成海报并分享

赞 8

提建议

上一篇
21 | Java 消费者是如何管理TCP连接的?
下一篇
23 | Kafka副本机制详解
unpreview
 写留言

精选留言(56)

  • 刘丹
    2019-07-23
    对lead这个指标的含义还是不能理解。既然Lead 值是指消费者最新消费消息的位移与分区当前第一条消息的差值。好像这个值应该是越小越好,越小的话就意味着分区里未被消费的消息越少?

    作者回复: lead越小意味着consumer消费的消息越来越接近被删除的边缘,显然是不好的

    共 8 条评论
    36
  • 皇家救星
    2019-07-25
    前天用别人号提了一个问题,我今天找到答案了。问题:搭建了一个kafka单机服务器,消息收发正常。升级成3个服务器集群后消息能写但不能读(启动命令行消费工具后一直没输出,使用strace -f跟踪发现一直在epoll>超时>epoll的循环中),观察服务器日志无输出。定位问题过程:使用topic查询consumer_offest,发现分区leader的brokerid是0,但是我机器集群brokerid是1-3,这时才想起升级过程中把0号broker改成了1号(修改了logs和config下面的配置id)。尝试把1号改回0,重启即可正常消费。最终解决方法:使用kafka提供的工具,调整offset主题分布机器为1,2,3,生产消费即恢复正常
    展开
    26
  • zander
    2019-08-04
    在公司的kafka消费者监控上,经常可以看到lag 为一个负数,比如-3,-109等,想咨询一下,为什么会出现负数呢?

    作者回复: 可能出现丢失数据了。lag<0一定要重点关注

    共 5 条评论
    20
  • Guard-God
    2019-09-03
    老师您好,jmx 监控的lead我认为很重要,但是,我使用JConsole连接后怎么就没有kafka.consumer,这块呢?我测了好多版本,我想问一下,这个是如何配置的

    作者回复: 你连接的是Broker的JMX端口吧,你需要连接consumer的JMX端口

    共 5 条评论
    19
  • 风轻扬
    2020-05-20
    老师,我理解的lead会变小的原因是:kafka的消息会定期清除。所以分区第一条消息的offset值会变大。如果消费的速度慢,则current-offset的值会增长很难。此时lead的值就越来越小。是这样吗?

    作者回复: 是的

    共 2 条评论
    11
  • 小头针
    2019-07-30
    听胡老师这节课,最大的收获就是知道了Lead指标的重要性,之前就没注意到这个指标。 真实业务场景中,我们采用过kafka tool,kafka manager和jconsole和jvisualvm。 如果监控kafka主题、分区、数据流入流出、调整分区数、Lag等kafka manager比较直观好看,不推荐kafka tool,不方便会卡顿。 如果要监控内存、线程安全、进程,Lag、Lead等的话,就会使用jconsole和jvisualvm,另外,有利于排查kafka是否存在死锁问题。
    展开
    11
  • 注定非凡
    2019-11-06
    1 为什么要监控 A :对于Kafka消费者,最重要的事情就是监控它们的消费进度(消费的滞后程度)常称为:Consumer Lag B :Lag的单位是消息数,他直接反映了一个消费者的运行情况。一个正常的消费者的Lag应当很小,设置为0。这表明消费者能够及时地消费生产者生产出来的消息。反之,一个消费者Lag值很大的话表明它无法跟上生产者的速度。 C :如果消费者速度无法匹及生产者的数据,极有可能导致它消费的数据已经不在操作系统的页缓存中了,那些数据就失去了享有Zero Copy技术的条件,不得不从磁盘中读取,进一步拉大了与生产者的差距。并且会越来大。 所以:在实际业务场景中必须时刻关注消费者的消费进度。一旦出现Lag逐步增加的趋势,就要立即定位问题,及时处理,避免问题扩散。 2 如何监控 A :使用Kafka自带的命令行工具kafka-consumer-groups脚本 B :使用Kafka Java Conssumer API编程 C :使用Kafka自带的JMX监控指标 3 方法具体分析 A :Kafka自带命令 (1) kafka-consumer-groups脚本是kafka为我们提供的最直接的监控消费者消费进度工具。 (2) 使用: $ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息 > --describe --group <group 名称 > <Kafka broker 连接信息 >:主机:端口 <group 名称 > :要监控的消费组的 group.id值 (3)展示的信息:主题,分区,该消费者组最新消费消息的位移值(CURRENT-OFFSET值),每个分区当前最新生产的消息的位移值(LOG-END-OFFSET),LAG(前两者的差值),消费者实例ID,消费者连接Broker的主机名以及消费者的CLENT-ID信息。 B :Kafka Java Consumer API (1)首先获取给定的消费者组的最新消费消息的位移 (2)在获取订阅分区的最新消息位移 (3)最后执行相应的减法操作,获取Lag值并封装进一个Map对象。 C :Kafka JMX监控指标 使用Kafka默认提供 的JMX监控指标来监控消费者的Lag值。 (1)Kafka消费者提供了一个名为Kafka.consumer:type=consumer-fetch-manager-metrics,client-id=”{client-id}”的JMX指标。 (2)有两个重要的属性:records-lag-max 和 records-lead-min 分别表示消费者在测试窗口时间内曾经达到的最大的Lag值和最小的Lead值。 (3)Lead值是指消费者最新消费消息的位移和分区当前第一条消息的位移的差值。即:Lag越大,Lead就越小。
    展开
    10
  • 曾轼麟
    2019-12-12
    老师,请问你的这个消费者堆积监控代码,是否会触发消费者组的rebalance呢?

    作者回复: 不会引发,因为是用的AdminClient

    7
  • 天天向上
    2019-09-23
    分区第一条生产的消息偏移量 记为i1 分区最后一条生产的消息偏移量 记为i2 消费者在当前分区消费的最后一条消息的偏移量 记为i3 lead=i3-i1 lag=i2-i3 画条直线标记三个点就特别容易理解啦 lag越大说明 消费速度小于生产速度 lead越小 说明消费速度快临近删除速度啦 要特别小心啦 消费慢 如果消息不删除可以慢慢消费无大碍 消费慢 如果消息有删除策略 就要掂记着别过分的慢 还没来得及消费就删除啦 而消息中间件不可能不删除陈旧消息的 所以lead参数的监控更要注意
    展开
    共 1 条评论
    7
  • helloworld
    2020-03-15
    若该消费者消费了一个主题中的多个分区,那通过JMX方式的第一个JMX指标kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”角度如何理解,此时的两个监控项records-lag-max和records-lead-min分别代表什么含义?

    作者回复: 这个JMX是在整个consumer级别上统计,而不是在单个topic或topic分区上统计。records-lag-max即记录所有订阅分区上最大的lag。lead也是同理

    6
  • 灰机
    2019-12-16
    老师您好 理想情况下是不是lead值要等于offset-1?

    作者回复: lead = end of offset - log start offset。通常情况下log start offset不等于0

    5
  • hunterlodge
    2019-12-12
    老师,您的那段获取消费组的lag代码,是会导致新的consumer加入到消费组里,从而引起rebalance的吧?

    作者回复: 不会引发lag,因为用的是AdminClient

    5
  • 昀溪
    2019-10-12
    老师 怎么能够获取生产者 消费者和主题的关系呢。 我们想要梳理目前线上的 哪些生产者往哪些主题发消息以及哪些消费者消费某些主题。希望能快速获取并梳理出来 老师能不能给一些思路 谢谢

    作者回复: 目前无法获取到集群中生产者的数量信息,因为broker端并没有这方面的记录。消费者的话可以使用kafka-consumer-groups命令来间接获取

    共 3 条评论
    5
  • 陆攀
    2019-08-01
    老师,除了监控lag外,我还想监控消费的延迟时间。比如我想知道消费的当条数据的产生时间和当前最后一条数据的产生时间的差。这种有什么办法吗?

    作者回复: 消息里面有创建的时间戳,可以用程序自行计算

    4
  • wykkx
    2019-07-27
    老师请教下,使用bin/kafka-consumer-groups.sh这个方法,如果是独立的消费者,groupid那个地方要怎么填,刚才文章里还是没有说啊,谢谢。

    作者回复: 可以随意指定,不过要额外加上--partition参数

    共 2 条评论
    4
  • 小源哥
    2020-07-27
    leed指的是消费者的位移距离当前还没被删除的第一条消息的差值,可以这么理解吗

    作者回复: 可以的~

    3
  • helloworld
    2020-03-15
    我用的Kafka版本为kafka_2.11-2.1.1,Kafka Java 客户端版本为1.0.0发现只有kafka.consumer中的records-lag-max,没有records-lead-min;并且也没有分区相关的jmx信息

    作者回复: 这是客户端的JMX指标,进了2.0.0版本的客户端代码。详见https://issues.apache.org/jira/browse/KAFKA-6184

    共 2 条评论
    3
  • 柯察金
    2019-11-09
    感觉通过 javaAPI 的方式有问题啊。第一,引入 consumer 只是获取位移,可能会引起 rebalance。而且 consumer.endOffsets(consumedOffsets.keySet()); 这个方法,consumer 能获取到所有分区的位移信息吗?我一直以为是这样的:consumer 只能获取这个 consumer 所分配到的分区的位移

    作者回复: 第一、不会引入rebalance,因为没有subscribe; 第二、可以,因为AdminClient的listConsumerGroupOffsets访问所有分区

    3
  • 我已经设置了昵称
    2019-07-24
    我们用的kafka manager。有个疑问,kafka监控工具对于单条信息支持都不好,是kafka本身特性就不支持还是怎么样。rocketMQ支持的就很好
    共 1 条评论
    3
  • nightmare
    2019-07-23
    Lead的越小就代表可能会丢消息了吗?

    作者回复: 是啊,因为已经逼近最早的消息了

    共 2 条评论
    3