极客时间已完结课程限时免费阅读

19 | 消息队列:如何降低消息队列系统中消息的延迟?

19 | 消息队列:如何降低消息队列系统中消息的延迟?-极客时间

19 | 消息队列:如何降低消息队列系统中消息的延迟?

讲述:唐扬

时长12:16大小11.23M

你好,我是唐扬。
学完前面两节课之后,相信你对在垂直电商项目中如何使用消息队列应对秒杀时的峰值流量已经有所了解。当然了,你也应该知道要如何做才能保证消息不会丢失,尽量避免消息重复带来的影响。那么我想让你思考一下:除了这些内容,你在使用消息队列时还需要关注哪些点呢?
先来看一个场景:在你的垂直电商项目中,你会在用户下单支付之后向消息队列里面发送一条消息,队列处理程序消费了消息后会增加用户的积分或者给用户发送优惠券。用户在下单之后,等待几分钟或者十几分钟拿到积分和优惠券是可以接受的,但是一旦消息队列出现大量堆积,用户消费完成后几小时还拿到优惠券,那就会有用户投诉了。
这时你要关注的就是消息队列中消息的延迟了,这其实是消费性能的问题,那么你要如何提升消费性能保证更短的消息延迟呢?在我看来,首先需要掌握如何来监控消息的延迟,因为有了数据之后你才可以知道目前的延迟数据是否满足要求,也可以评估优化之后的效果。然后你要掌握使用消息队列的正确姿势以及关注消息队列本身是如何保证消息尽快被存储和投递的。
接下来,我们先来看看第一点:如何监控消息延迟。

如何监控消息延迟

在我看来,监控消息的延迟有两种方式:
使用消息队列提供的工具,通过监控消息的堆积来完成;
通过生成监控消息的方式来监控消息的延迟情况。
接下来,我带你实际了解一下。
假设在开篇的场景之下电商系统中的消息队列已经堆积了大量的消息,那么你要想监控消息的堆积情况,首先需要从原理上了解在消息队列中消费者的消费进度是多少,因为这样才方便计算当前的消费延迟是多少。比如生产者向队列中一共生产了 1000 条消息,某一个消费者消费进度是 900 条,那么这个消费者的消费延迟就是 100 条消息。
在 Kafka 中,消费者的消费进度在不同的版本上是不同的。
在 Kafka0.9 之前的版本中,消费进度是存储在 ZooKeeper 中的,消费者在消费消息的时候先要从 ZooKeeper 中获取最新的消费进度,再从这个进度的基础上消费后面的消息。
在 Kafka0.9 版本之后,消费进度被迁入到 Kakfa 的一个专门的 topic 叫“__consumer_offsets”里面。所以如果你了解 kafka 的原理,可以依据不同的版本从不同的位置获取到这个消费进度的信息。
当然,作为一个成熟的组件,Kafka 也提供了一些工具来获取这个消费进度的信息帮助你实现自己的监控,这个工具主要有两个:
首先,Kafka 提供了工具叫做“kafka-consumer-groups.sh”(它在 Kafka 安装包的 bin 目录下)。
为了帮助你理解,我简单地搭建了一个 Kafka 节点并且写入和消费了一些信息,然后我来使用命令看看消息累积情况,具体的命令如下:
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-consumer-group
结果如下:
图中的前两列是队列的基本信息,包括话题名和分区名;
第三列是当前消费者的消费进度;
第四列是当前生产消息的总数;
第五列就是消费消息的堆积数(也就是第四列与第三列的差值)。
通过这个命令你可以很方便地了解消费者的消费情况。
第二个工具是 JMX。
Kafka 通过 JMX 暴露了消息堆积的数据,我在本地启动了一个 console consumer,然后使用 jconsole 连接 consumer 就可以看到 consumer 的堆积数据了(就是下图中红框里的数据)。这些数据你可以写代码来获取,这样也可以方便地输出到监控系统中,我比较推荐这种方式。
除了使用消息队列提供的工具以外,你还可以通过生成监控消息的方式来监控消息的延迟。具体怎么做呢?
你先定义一种特殊的消息,然后启动一个监控程序将这个消息定时地循环写入到消息队列中,消息的内容可以是生成消息的时间戳并且也会作为队列的消费者消费数据。业务处理程序消费到这个消息时直接丢弃掉,而监控程序在消费到这个消息时就可以和这个消息的生成时间做比较,如果时间差达到某一个阈值就可以向我们报警。
这两种方式都可以监控消息的消费延迟情况,而从我的经验出发,我比较推荐两种方式结合来使用。比如在实际项目中,我会优先在监控程序中获取 JMX 中的队列堆积数据做到 dashboard 报表中,同时也会启动探测进程确认消息的延迟情况是怎样的。
在我看来,消息的堆积是对于消息队列的基础监控,这是你无论如何都要做的。但是了解了消息的堆积情况并不能很直观地了解消息消费的延迟,你也只能利用经验来确定堆积的消息量到了多少才会影响到用户的体验;而第二种方式对于消费延迟的监控则更加直观,而且从时间的维度来做监控也比较容易确定报警阈值。
了解了消息延迟的监控方式之后,我们再来看看如何提升消息的写入和消费性能,这样才会让异步的消息得到尽快的处理。

减少消息延迟的正确姿势

想要减少消息的处理延迟,我们需要在消费端和消息队列两个层面来完成。
在消费端的目标是提升消费者的消息处理能力,你能做的是:
优化消费代码提升性能;
增加消费者的数量(这个方式比较简单)。
不过第二种方式会受限于消息队列的实现。如果消息队列使用的是 Kafka 就无法通过增加消费者数量的方式来提升消息处理能力。
因为在 Kafka 中,一个 Topic(话题)可以配置多个 Partition(分区),数据会被平均或者按照生产者指定的方式写入到多个分区中,那么在消费的时候,Kafka 约定一个分区只能被一个消费者消费,为什么要这么设计呢?在我看来,如果有多个 consumer(消费者)可以消费一个分区的数据,那么在操作这个消费进度的时候就需要加锁,可能会对性能有一定的影响。
所以说,话题的分区数量决定了消费的并行度,增加多余的消费者也是没有用处的,你可以通过增加分区来提高消费者的处理能力。
那么,如何在不增加分区的前提下提升消费能力呢?
虽然不能增加 consumer,但你可以在一个 consumer 中提升处理消息的并行度,所以可以考虑使用多线程的方式来增加处理能力:你可以预先创建一个或者多个线程池,在接收到消息之后把消息丢到线程池中来异步地处理,这样,原本串行的消费消息的流程就变成了并行的消费,可以提高消息消费的吞吐量,在并行处理的前提下,我们就可以在一次和消息队列的交互中多拉取几条数据,然后分配给多个线程来处理。
另外,在消费队列中数据的时候还需要注意消费线程空转的问题。
我是在测试自己写的一个消息中间件的时候发现的。当时,我发现运行消费客户端的进程会偶发地出现 CPU 跑满的情况,于是打印了 JVM 线程堆栈,找到了那个跑满 CPU 的线程。这个时候才发现,原来是消息队列中有一段时间没有新的消息,于是消费客户端拉取不到新的消息就会不间断地轮询拉取消息,这个线程就把 CPU 跑满了。
所以,你在写消费客户端的时候要考虑这种场景,拉取不到消息可以等待一段时间再来拉取,等待的时间不宜过长,否则会增加消息的延迟。我一般建议固定的 10ms~100ms,也可以按照一定步长递增,比如第一次拉取不到消息等待 10ms,第二次 20ms,最长可以到 100ms,直到拉取到消息再回到 10ms。
说完了消费端的做法之后,再来说说消息队列本身在读取性能优化方面做了哪些事情。
我曾经也做过一个消息中间件,在最初设计中间件的时候我主要从两方面考虑读取性能问题:
消息的存储;
零拷贝技术。
针对第一点,我最初在设计的时候为了实现简单,使用了普通的数据库来存储消息,但是受限于数据库的性能瓶颈,读取 QPS 只能到 2000,后面我重构了存储模块,使用本地磁盘作为存储介质。Page Cache 的存在就可以提升消息的读取速度,即使要读取磁盘中的数据,由于消息的读取是顺序的并且不需要跨网络读取数据,所以读取消息的 QPS 提升了一个数量级。
另外一个优化点是零拷贝技术,说是零拷贝,其实我们不可能消灭数据的拷贝,只是尽量减少拷贝的次数。在读取消息队列的数据的时候,其实就是把磁盘中的数据通过网络发送给消费客户端,在实现上会有四次数据拷贝的步骤:
1. 数据从磁盘拷贝到内核缓冲区;
2. 系统调用将内核缓存区的数据拷贝到用户缓冲区;
3. 用户缓冲区的数据被写入到 Socket 缓冲区中;
4. 操作系统再将 Socket 缓冲区的数据拷贝到网卡的缓冲区中。
操作系统提供了 Sendfile 函数可以减少数据被拷贝的次数。使用了 Sendfile 之后,在内核缓冲区的数据不会被拷贝到用户缓冲区而是直接被拷贝到 Socket 缓冲区,节省了一次拷贝的过程提升了消息发送的性能。高级语言中对于 Sendfile 函数有封装,比如说在 Java 里面的 java.nio.channels.FileChannel 类就提供了 transferTo 方法提供了 Sendfile 的功能。

课程小结

本节课我带你了解了如何提升消息队列的性能来降低消息消费的延迟,这里我想让你明确的重点是:
我们可以使用消息队列提供的工具,或者通过发送监控消息的方式来监控消息的延迟情况;
横向扩展消费者是提升消费处理能力的重要方式;
选择高性能的数据存储方式配合零拷贝技术,可以提升消息的消费性能。
其实队列是一种常用的组件,只要涉及到队列,任务的堆积就是一个不可忽视的问题,我遇到过的很多故障都是源于此。
比如前一段时间处理的一个故障,前期只是因为数据库性能衰减有少量的慢请求,结果这些慢请求占满了 Tomcat 线程池,导致整体服务的不可用。如果我们能对 Tomcat 线程池的任务堆积情况有实时的监控,或者说对线程池有一些保护策略,比方说线程全部使用之后丢弃请求,也许就会避免故障的发生。在此,我希望你在实际的工作中能够引以为戒,只要有队列就要监控它的堆积情况,把问题消灭在萌芽之中。

一课一思

在实际的项目中,你可能对于消息队列的使用已经很熟练了,那么结合今天的内容,你可以和我分享一下在研发过程中,你在降低消息延迟方面做过哪些事情呢?欢迎在留言区和我一起讨论,或者将你的实战经验分享给更多的人。
最后,感谢你的阅读,如果这篇文章让你有所收获,也欢迎你将它分享给更多的朋友。
分享给需要的人,Ta购买本课程,你将得18
生成海报并分享

赞 20

提建议

上一篇
18 | 消息投递:如何保证消息仅仅被消费一次?
下一篇
20 | 面试现场第二期:当问到项目经历时,面试官究竟想要了解什么?
 写留言

精选留言(33)

  • leechanx
    2019-11-04
    sendfile 在linux 2.1的时候,是文件与socket两个内核缓冲区间copy 在更高linux版本,已经能从文件缓冲区直接写到网卡了,socket缓冲区省掉了
    共 1 条评论
    89
  • 仙道
    2019-11-06
    老师,关于消息队列的一个重要东西,您还没有讲到。 消息的顺序性。比如订单消息要在配送消息消费之前消费 对于这种问题,有因果一致性来解决。因果一致性的一种做法是,消息的编号递增,小的号码先消费,大的号码要在后消费。 但是在分布式集群环境下,每个消费者消费的时候要把它的最大消息编号广播给其他消费者。 这样一来的话,代价就比较高了。
    展开

    作者回复: 也可以比如在Kafka下,把要求严格顺序的消息写到一个partition

    共 5 条评论
    25
  • 陆离
    2019-11-01
    并行消费那块有数据丢失的风险吧。 server接收到数据返回给队列ack,然后丢给线程池,数据还没处理完这个节点挂掉了,数据不就全丢了。

    作者回复: 会有这个风险,不过为了提升性能,需要一些权衡

    共 11 条评论
    18
  • 每天晒白牙
    2019-12-22
    今日学习总结 我们使用消息队列,一般会关注消息的延迟,然后优化消息的读写性能,但首先要做的是对延迟的监控,那怎么监控消息队列的延迟呢? 1.通过消息队列的工具,来监控消息的堆积情况 一般消息队列都会提供这种工具,拿kafka来说,可以通过 kafka-consumer-groups.sh 这个命令来监控堆积情况。主要关注【lag】列 同时kafka会通过jmx暴露消息的堆积情况,所以可以使用jconsole这种工具进行查看 2.通过生成监控消息的方式来监控延迟情况 这种方式就是启动一个监控程序,并生产一种特殊的消息,比如是消息生成的时间戳,然后循环的写入到消息队列中,真正的消费者对于这种特殊的消息不进行处理,而监控程序会处理这种特殊的消息,通过比较消费到该消息的时间和生成该消息的时间差来判断是否有延迟 那如何减少消息的延迟呢? 1.优化消费端的代码以提升性能 2.增加消费者的数量 关于第2点对消息队列会有限制,比如对于kafka来说,在同一个分区下,消息只能被一个消费者【同一个consumer_group】消费,这样也是考虑到了多个消费者消费需要抢占锁,会有性能损失 【topic的分区数量决定了消费者的并行度】 所以想提高并行度就得增加分区,那如何不增加分区呢?可以在处理消息上做工作了,比如可以在一个消费者中增加处理消息的并行度,比如多线程的方式去处理,或者加入异步处理 【这里有值得注意的,就是消费消息时消费线程的空转问题】 消费端通过不断轮训去topic中拉取消息,就存在线程空转的情况,这里可以增加一个策略就是拉取不到消息,等待一会儿,等待时间可以是固定的。也可以是阶梯的,当然这样可能加重消息消费延迟情况 消息队列本身读取性能提升可以从下面两个方面着手 1.消息的存储 2.零拷贝技术
    展开

    作者回复: 赞

    共 3 条评论
    16
  • Klaus
    2019-12-18
    数据库数据本身也是属于磁盘数据啊,为什么在存储的时候将数据库换成本地磁盘,qps会提高一个数量级呢?

    作者回复: 数据库是随机写,磁盘是顺序写

    共 4 条评论
    11
  • 2020-04-25
    MQ的一个核心卖点就是削峰填谷,一般来讲生产和消费必然存在速度差,而且常常把慢动作放在消费侧,有短暂积压应该是正常现象,如果长时间有大量积压,那就是消费侧过慢了,需要优化,具体怎么优化,也需要看具体性能瓶颈在哪里,不过核心思路在于,有性能瓶颈的地方最好可以通过水平扩展的方式来解决,否则就只能沿着网线找性能瓶颈点,然后把具体瓶颈给优化掉啦!一般应用是IO密集型,找有IO的地方基本能找到性能瓶颈。
    9
  • intomymind
    2019-11-01
    消费者接收到消息后,放入到线程池里面,那么在线程池里对消息处理完之后,如何实现返回ack给kafka呢

    作者回复: 为了性能可能要损失一些消息完整性

    共 4 条评论
    6
  • xiaochao321
    2020-06-23
    消息顺序性问题 rabbitmq使用一个队列对应一个消费者消费 kafka 相同key的消息放到同一个partition中,使用同一个内存队列进行消费 还要保证消费端是单线程的
    3
  • dondon
    2020-03-26
    Kafka 通过 JMX 暴露了消息堆积的数据,我在本地启动了一个 console consumer,然后使用 jconsole 连接 consumer 就可以看到 consumer 的堆积数据了(就是下图中红框里的数据)。 意思是说,生产环境Linux服务器上的kafka服务,我可以在本地写一个consumer订阅broker, 然后能通过jconsole可以看到kafka里的消息队列情况?

    作者回复: 可能生产环境不好用jconsole,不过可以写代码获取jmx的数据

    4
  •  海大
    2020-02-06
    消息处理时避免耗时操作也很重要

    作者回复: 是的,必要时可以做多级队列

    4
  • hunterlodge
    2019-11-26
    老师,请问怎么采集每个consumer的JMX数据呢,让监控程序去连接每一个consumer应用吗?

    作者回复: 是的呢~

    3
  • 你净瞎说~
    2019-11-05
    老师,我想问一下,消息中间件的读写都是顺序的吗?写文件的时候会先写到pageCache中,然后往文件写,写文件的时候应该是一行一行写的吧?如果不写入pageCache直接写入文件,也是一行一行写的吧,有什么区别吗?比如说写到文件的第1万行了,再往后继续写,是可以快速定位到这个位置的吗?读取消息的时候也是读的文件,假如说读到了第1万行,再往后读,也是可以很快定位到这个1万行吗?java有没有api,比如说,我直接读取第1万行的数据?
    展开

    作者回复: 写文件一般都会写入pagecache 由操作系统来刷盘

    共 3 条评论
    3
  • 电光火石
    2019-11-03
    零拷贝时,从内核缓冲区到socket缓冲区是否可以通过共享缓冲区的方式,再减少一次拷贝?谢谢!

    作者回复: 可以将描述符传输到socket缓冲区,这样可以减少一次拷贝

    共 2 条评论
    3
  • Omer
    2019-11-01
    你先定义一种特殊的消息,然后启动一个监控程序,将这个消息定时地循环写入到消息队列中,消息的内容可以是生成消息的时间戳,并且也会作为队列的消费者消费数据。业务处理程序消费到这个消息时直接丢弃掉,而监控程序在消费到这个消息时,就可以和这个消息的生成时间做比较,如果时间差达到某一个阈值就可以向我们报警。 老师,这里有个问题,你这么做是问了判断当前队列的延迟时间对吧,但是这样子做的话定时生成的时间戳和正式的任务混在一起了,这样不就给正常执行的业务处理程序增加了很多负担,会不会导致太多的这种任务导致对于正式的业务延迟更大?
    展开

    作者回复: 不会的,这样比如1分钟发送一个消息,相比于业务动辄每秒几十或者几百次请求来说太微不足道了

    2
  • 林腾
    2020-04-23
    老师好,关于IO和NIO有一个疑问,我在java进程中用IO和NIO分别实现文件下载服务的功能,分别计算服务器提供文件下载服务前后的时间,好像是差不多的。这是什么原因呢?

    作者回复: io负载不高吧

    1
  • jun.hai
    2019-12-21
    老师,请教个问题:我们用的ActiveMQ隔一段时间后会出现,生产后消费时隔半个小时到一个小时的延迟,并大量也不大,这个怎么检查能找到根源?谢谢

    作者回复: 可以监控一下写入消息和处理消息的速率,看看是不是写入消息量级过大还来不及处理,还是说处理的慢了 如果处理的慢了,那么要排查处理的链路上哪处是瓶颈

    共 2 条评论
    1
  • 李慢慢
    2019-11-20
    问一下,公司还在jdk6时代,但是用到了RocketMQ,上面的这些监控也能做吗·。

    作者回复: 可以的吧 ,RocketMQ也会提供一些工具

    1
  • 云师兄
    2019-11-08
    请教老师kafka新版本offset通过主题方式替换zk,能提升多大的吞吐量,有没有量化的数据参考

    作者回复: 没有找到官方的benchmark

    共 2 条评论
    1
  • 旅途
    2019-11-04
    我想问一下老师发送特殊消息队列的那个 正常的消息里面不是有时间戳吗 用这个时间戳比较不行吗

    作者回复: 也可以

    共 3 条评论
    1
  • longslee
    2019-11-02
    打卡。嗯文中监控实战不错。 然后就是下面很多同学提到的丢给线程池处理丢失数据和线程池本身的堆积问题。 如果线程池方式不能避免丢失,那么客户端是否只有一条条的消费来处理消息呢,此时线程数完全受控与Kafka的partition数?

    作者回复: 消息处理程序一般是IO密集型的,所以可以提升线程数来提升并行处理能力

    1