极客时间已完结课程限时免费阅读

24 | 请求是怎么被处理的?

24 | 请求是怎么被处理的?-极客时间

24 | 请求是怎么被处理的?

讲述:胡夕

时长12:32大小11.47M

你好,我是胡夕。今天我要和你分享的主题是:Kafka 请求是怎么被处理的。
无论是 Kafka 客户端还是 Broker 端,它们之间的交互都是通过“请求 / 响应”的方式完成的。比如,客户端会通过网络发送消息生产请求给 Broker,而 Broker 处理完成后,会发送对应的响应给到客户端。
Apache Kafka 自己定义了一组请求协议,用于实现各种各样的交互操作。比如常见的 PRODUCE 请求是用于生产消息的,FETCH 请求是用于消费消息的,METADATA 请求是用于请求 Kafka 集群元数据信息的。
总之,Kafka 定义了很多类似的请求格式。我数了一下,截止到目前最新的 2.3 版本,Kafka 共定义了多达 45 种请求格式。所有的请求都是通过 TCP 网络以 Socket 的方式进行通讯的
今天,我们就来详细讨论一下 Kafka Broker 端处理请求的全流程。

处理请求的 2 种常见方案

关于如何处理请求,我们很容易想到的方案有两个。
1.顺序处理请求。如果写成伪代码,大概是这个样子:
while (true) {
Request request = accept(connection);
handle(request);
}
这个方法实现简单,但是有个致命的缺陷,那就是吞吐量太差。由于只能顺序处理每个请求,因此,每个请求都必须等待前一个请求处理完毕才能得到处理。这种方式只适用于请求发送非常不频繁的系统
2. 每个请求使用单独线程处理。也就是说,我们为每个入站请求都创建一个新的线程来异步处理。我们一起来看看这个方案的伪代码。
while (true) {
Request = request = accept(connection);
Thread thread = new Thread(() -> {
handle(request);});
thread.start();
}
这个方法反其道而行之,完全采用异步的方式。系统会为每个入站请求都创建单独的线程来处理。这个方法的好处是,它是完全异步的,每个请求的处理都不会阻塞下一个请求。但缺陷也同样明显。为每个请求都创建线程的做法开销极大,在某些场景下甚至会压垮整个服务。还是那句话,这个方法只适用于请求发送频率很低的业务场景。
既然这两种方案都不好,那么,Kafka 是如何处理请求的呢?用一句话概括就是,Kafka 使用的是 Reactor 模式

Kafka 是如何处理请求的?

谈到 Reactor 模式,大神 Doug Lea 的“Scalable IO in Java”应该算是最好的入门教材了。即使你没听说过 Doug Lea,那你应该也用过 ConcurrentHashMap 吧?这个类就是这位大神写的。其实,整个 java.util.concurrent 包都是他的杰作!
好了,我们说回 Reactor 模式。简单来说,Reactor 模式是事件驱动架构的一种实现方式,特别适合应用于处理多个客户端并发向服务器端发送请求的场景。我借用 Doug Lea 的一页 PPT 来说明一下 Reactor 的架构,并借此引出 Kafka 的请求处理模型。
Reactor 模式的架构如下图所示:
从这张图中,我们可以发现,多个客户端会发送请求给到 Reactor。Reactor 有个请求分发线程 Dispatcher,也就是图中的 Acceptor,它会将不同的请求下发到多个工作线程中处理。
在这个架构中,Acceptor 线程只是用于请求分发,不涉及具体的逻辑处理,非常得轻量级,因此有很高的吞吐量表现。而这些工作线程可以根据实际业务处理需要任意增减,从而动态调节系统负载能力。
如果我们来为 Kafka 画一张类似的图的话,那它应该是这个样子的:
显然,这两张图长得差不多。Kafka 的 Broker 端有个 SocketServer 组件,类似于 Reactor 模式中的 Dispatcher,它也有对应的 Acceptor 线程和一个工作线程池,只不过在 Kafka 中,这个工作线程池有个专属的名字,叫网络线程池。Kafka 提供了 Broker 端参数 num.network.threads,用于调整该网络线程池的线程数。其默认值是 3,表示每台 Broker 启动时会创建 3 个网络线程,专门处理客户端发送的请求
Acceptor 线程采用轮询的方式将入站请求公平地发到所有网络线程中,因此,在实际使用过程中,这些线程通常都有相同的几率被分配到待处理请求。这种轮询策略编写简单,同时也避免了请求处理的倾斜,有利于实现较为公平的请求处理调度。
好了,你现在了解了客户端发来的请求会被 Broker 端的 Acceptor 线程分发到任意一个网络线程中,由它们来进行处理。那么,当网络线程接收到请求后,它是怎么处理的呢?你可能会认为,它顺序处理不就好了吗?实际上,Kafka 在这个环节又做了一层异步线程池的处理,我们一起来看一看下面这张图。
当网络线程拿到请求后,它不是自己处理,而是将请求放入到一个共享请求队列中。Broker 端还有个 IO 线程池,负责从该队列中取出请求,执行真正的处理。如果是 PRODUCE 生产请求,则将消息写入到底层的磁盘日志中;如果是 FETCH 请求,则从磁盘或页缓存中读取消息。
IO 线程池处中的线程才是执行请求逻辑的线程。Broker 端参数 num.io.threads 控制了这个线程池中的线程数。目前该参数默认值是 8,表示每台 Broker 启动后自动创建 8 个 IO 线程处理请求。你可以根据实际硬件条件设置此线程池的个数。
比如,如果你的机器上 CPU 资源非常充裕,你完全可以调大该参数,允许更多的并发请求被同时处理。当 IO 线程处理完请求后,会将生成的响应发送到网络线程池的响应队列中,然后由对应的网络线程负责将 Response 返还给客户端。
细心的你一定发现了请求队列和响应队列的差别:请求队列是所有网络线程共享的,而响应队列则是每个网络线程专属的。这么设计的原因就在于,Dispatcher 只是用于请求分发而不负责响应回传,因此只能让每个网络线程自己发送 Response 给客户端,所以这些 Response 也就没必要放在一个公共的地方。
我们再来看看刚刚的那张图,图中有一个叫 Purgatory 的组件,这是 Kafka 中著名的“炼狱”组件。它是用来缓存延时请求(Delayed Request)的。所谓延时请求,就是那些一时未满足条件不能立刻处理的请求。比如设置了 acks=all 的 PRODUCE 请求,一旦设置了 acks=all,那么该请求就必须等待 ISR 中所有副本都接收了消息后才能返回,此时处理该请求的 IO 线程就必须等待其他 Broker 的写入结果。当请求不能立刻处理时,它就会暂存在 Purgatory 中。稍后一旦满足了完成条件,IO 线程会继续处理该请求,并将 Response 放入对应网络线程的响应队列中。
讲到这里,Kafka 请求流程解析的故事其实已经讲完了,我相信你应该已经了解了 Kafka Broker 是如何从头到尾处理请求的。但是我们不会现在就收尾,我要给今天的内容开个小灶,再说点不一样的东西。

控制类请求和数据类请求分离

到目前为止,我提及的请求处理流程对于所有请求都是适用的,也就是说,Kafka Broker 对所有请求是一视同仁的。但是,在 Kafka 内部,除了客户端发送的 PRODUCE 请求和 FETCH 请求之外,还有很多执行其他操作的请求类型,比如负责更新 Leader 副本、Follower 副本以及 ISR 集合的 LeaderAndIsr 请求,负责勒令副本下线的 StopReplica 请求等。与 PRODUCE 和 FETCH 请求相比,这些请求有个明显的不同:它们不是数据类的请求,而是控制类的请求。也就是说,它们并不是操作消息数据的,而是用来执行特定的 Kafka 内部动作的。
Kafka 社区把 PRODUCE 和 FETCH 这类请求称为数据类请求,把 LeaderAndIsr、StopReplica 这类请求称为控制类请求。细究起来,当前这种一视同仁的处理方式对控制类请求是不合理的。为什么呢?因为控制类请求有这样一种能力:它可以直接令数据类请求失效!
我来举个例子说明一下。假设我们有个主题只有 1 个分区,该分区配置了两个副本,其中 Leader 副本保存在 Broker 0 上,Follower 副本保存在 Broker 1 上。假设 Broker 0 这台机器积压了很多的 PRODUCE 请求,此时你如果使用 Kafka 命令强制将该主题分区的 Leader、Follower 角色互换,那么 Kafka 内部的控制器组件(Controller)会发送 LeaderAndIsr 请求给 Broker 0,显式地告诉它,当前它不再是 Leader,而是 Follower 了,而 Broker 1 上的 Follower 副本因为被选为新的 Leader,因此停止向 Broker 0 拉取消息。
这时,一个尴尬的场面就出现了:如果刚才积压的 PRODUCE 请求都设置了 acks=all,那么这些在 LeaderAndIsr 发送之前的请求就都无法正常完成了。就像前面说的,它们会被暂存在 Purgatory 中不断重试,直到最终请求超时返回给客户端。
设想一下,如果 Kafka 能够优先处理 LeaderAndIsr 请求,Broker 0 就会立刻抛出 NOT_LEADER_FOR_PARTITION 异常,快速地标识这些积压 PRODUCE 请求已失败,这样客户端不用等到 Purgatory 中的请求超时就能立刻感知,从而降低了请求的处理时间。即使 acks 不是 all,积压的 PRODUCE 请求能够成功写入 Leader 副本的日志,但处理 LeaderAndIsr 之后,Broker 0 上的 Leader 变为了 Follower 副本,也要执行显式的日志截断(Log Truncation,即原 Leader 副本成为 Follower 后,会将之前写入但未提交的消息全部删除),依然做了很多无用功。
再举一个例子,同样是在积压大量数据类请求的 Broker 上,当你删除主题的时候,Kafka 控制器(我会在专栏后面的内容中专门介绍它)向该 Broker 发送 StopReplica 请求。如果该请求不能及时处理,主题删除操作会一直 hang 住,从而增加了删除主题的延时。
基于这些问题,社区于 2.3 版本正式实现了数据类请求和控制类请求的分离。其实,在社区推出方案之前,我自己尝试过修改这个设计。当时我的想法是,在 Broker 中实现一个优先级队列,并赋予控制类请求更高的优先级。这是很自然的想法,所以我本以为社区也会这么实现的,但后来我这个方案被清晰地记录在“已拒绝方案”列表中。
究其原因,这个方案最大的问题在于,它无法处理请求队列已满的情形。当请求队列已经无法容纳任何新的请求时,纵然有优先级之分,它也无法处理新的控制类请求了。
那么,社区是如何解决的呢?很简单,你可以再看一遍今天的第三张图,社区完全拷贝了这张图中的一套组件,实现了两类请求的分离。也就是说,Kafka Broker 启动后,会在后台分别创建两套网络线程池和 IO 线程池的组合,它们分别处理数据类请求和控制类请求。至于所用的 Socket 端口,自然是使用不同的端口了,你需要提供不同的 listeners 配置,显式地指定哪套端口用于处理哪类请求。

小结

讲到这里,Kafka Broker 请求处理流程的解析应该讲得比较完整了。明确请求处理过程的最大意义在于,它是你日后执行 Kafka 性能优化的前提条件。如果你能从请求的维度去思考 Kafka 的工作原理,你会发现,优化 Kafka 并不是一件困难的事情。

开放讨论

坦白来讲,我对社区否定优先级队列方案是有一点不甘心的。如果是你的话,你觉得应该如何规避优先级队列方案中队列已满的问题呢?
欢迎写下你的思考和答案,我们一起讨论。如果你觉得有所收获,也欢迎把文章分享给你的朋友。
分享给需要的人,Ta购买本课程,你将得20
生成海报并分享

赞 31

提建议

上一篇
23 | Kafka副本机制详解
下一篇
25 | 消费者组重平衡全流程解析
unpreview
 写留言

精选留言(74)

  • 朱东旭
    2019-11-02
    胡老师您好,为什么有时候听到epoll,有时候听到reactor,这俩有啥区别。。

    作者回复: 不是一个层级的东西。epoll是一种IO模型,而Reactor是一种IO处理模式(IO pattern)。可以这么说:我们可以使用epoll来实现Reactor

    共 6 条评论
    49
  • 明翼
    2019-07-28
    有两种方法:1 是直接替换数据处理队列中的最前面的数据进行处理,处理完控制队列,再将这个消息插队到队头;2 双队列设计,不过双队列,如果先处理控制消息,如果一直来控制消息,数据队列的消息岂不会被延迟很大; 关于复制一套,我看了下面评论,我和部分网友的理解不一样,我觉得是复制一套网络线程持+中间队列+IO线程池;也就是有两个网络线程池,+2个中间队列,和2套IO线程持; 网络线程池作用将数据分发到中间队列,和接受IO线程池的处理结果回复给客户端。我理解为什么要加这个中间队列是为了将网络处理的线程数和IO处理的线程数解耦,达到高性能和资源少占用的目的。
    展开

    作者回复: 我觉得不错:)

    27
  • 注定非凡
    2019-11-07
    1 Apache Kafka 自己定义了组请求协议,用于实现各种交互操作。常见有: a. PRODUCE 请求用于生产消息 b. FETCH请求是用于消费消息 c. METADATA请求是用于请求Kafka集群元数据信息。 Kafka定义了很多类似的请求格式,所有的请求都是通过TCP网络以Socket的方式进行通讯的。 2 KaKfa Broker端处理请求的全流程 A :常用请求处理方案 a:顺序处理请求 实现方法简答,但吞吐量太差是致命缺陷。因为是顺序处理,每个请求都必须等待前一个请求处理完毕才能得到处理。这只适用于请求发送非常不频繁的系统。 b:每个请求使用单独线程处理 它是完全异步的,每个请求的处理都创建单独线程处理,但缺陷明显,为每个请求都创建线程开销极大,某些场景甚至会压垮整个服务。 B :Kafka的方案:使用Reactor模式 a:Reactor模式是JUC包作者的作品 b:Reactor模式是事件驱动架构的一种实现方式,特别适应用于处理多个客户端并发向服务端发送请求的场景。 3 Kafka的请求处理方式 A :Reactor模式中,多个客户端发送请求到Reactor。Reactor有个请求分发线程Dispatcher,它会将不同的请求下发到多个工作线程中处理。 Acceptor线程只用于请求分发,不涉及具体逻辑处理,因此有很高的吞吐量。而工作线程可以根据实际业务处理需要任意增减,从而动态调节系统负载能力。 B :kakfa中,Broker端有个SocketServer组件,类似于Reactor模式中的Dispatcher,他也有对应的Acceptor线程和一个工作线程池,在kafka中,被称为网络线程池。 Broker端参数num.network.threads,用于调整该网络线程池的线程数,默认为4,表示每台Broker启动时,会创建3个网络线程,专门处理客户端发送的请求。 C :Acceptor线程采用轮询的方式将入站请求公平的发送到所有网络线程中。 D :当网络线程接收到请求后,Kafka在这个环节又做了一层异步线程池的处理。 (1)当网络线程拿到请求后,她不是自己处理,而是将请求放入到一个共享请求队列中。 (2)Broker端还有个IO线程池,负责从该队列中取出请求,执行真正的处理。如果是PRODUCE生产请求,则将消息写入到底层的磁盘日志中;如果是FETCH请求,则从磁盘或页缓存中读取消息。 E :IO线程池中的线程是执行请求逻辑的线程。Broker端参数num.io.threads控制了这个线程数,默认为8,表示每台Broker启动后自动创建8个IO线程处理请求。 F :请求队列是所有网络线程共享的,而响应队列则是每个网络线程专属的。原因在于Dispatcher只是用于请求分发而不负责响应回传,因此只能让每个网络线程自己发送Repsone给客户端,所有这些Response没必要放在一个公共的地方。 G :Purgatory组件,专门用来缓存延时请求(Delayed Requset)。如设置了acks=all的PRODUCE请求,该请求要必须等待ISR中所有副本都接收了消息后才能返回,此时处理该请求的IO线程就必须瞪大其他Broker的写入结果。当请求不能立即处理时,他就会暂存在Purgatory中。待满足了完成条件,IO线程会继续处理该请求,并将Response放入到对应的网络线程的响应队列中 4 Kafka对请求的处理特点 A :Kafka Broker对所有的请求都是一视同仁的。 B :这些请求根据功能,可分为不同的请求类型。从业务的权重角度来讲,是有高低之分的,如控制类请求可以影响数据类请求。 C :无原则的平等,会造成混乱 社区采取的方案是,同时创建两套完全样的组件,实现两类请求的分离。
    展开
    共 1 条评论
    18
  • 蛋炒番茄
    2019-08-28
    请求队列是所有网络线程共享的,而响应队列则是每个网络线程专属。为什么这样设计,原因没看懂。希望老师讲详细一点

    作者回复: 原因就是没必要放在一起,让各自线程自己发送就可以了。类似于:有了任务大家一起分,做完了自己单独汇报就行了

    共 9 条评论
    15
  • 拾光
    2019-12-03
    为什么不直接将Acceptor线程收到的请求存入共享队列,而要引入网络线程池来存?

    作者回复: 我认为就是单纯地想要在做一层生产者-消费者分离

    共 10 条评论
    12
  • MasterSong
    2020-05-13
    很自然的一种想法是在队列中预留部分空间给控制消息,比如队列空间达到95%时对于数据消息视作队列已满,但控制消息仍然可以入队

    作者回复: 的确是一种思路~

    10
  • ban
    2019-07-28
    老师,社区完全拷贝了这张图中的一套组件,实现了两类请求的分离。也就是说,Kafka Broker 启动后,会在后台分别创建网络线程池和 IO 线程池,它们分别处理数据类请求和控制类请求。 上面这段话不太懂,意思是说:分别建立两套组件(A套 网络线程池IO线程池:负责处理数据类请求)、(B套 网络线程池IO线程池:负责处理控制类请求),这样理解对吗?
    展开

    作者回复: 嗯嗯,差不多是这个意思

    10
  • 旭杰
    2020-02-19
    当 IO 线程处理完请求后,会将生成的响应发送到网络线程池的响应队列中,然后由对应的网络线程负责将 Response 返还给客户端。这个响应队列不是网络线程池共享的吗?还是说IO线程指定网络线程来发送响应?

    作者回复: 每个网络线程创建自己的响应队列。 “IO线程指定网络线程来发送响应” --- 严格来说不算是IO线程指定的,因为Kafka会记录请求是被哪个网络线程接收的,因此发送response时还会发往那个网络线程。

    共 6 条评论
    9
  • 王帅
    2020-04-11
    老师,你好。我在使用kafka-2.11_0.11.0.1的时候遇到了一个问题,kafka服务端由于文件打开数过多导致出现too many open files重启。但是查看kafka端口使用情况只有1.3w+.通过lsof查看sock占用数达到了6W+。(我配置的最大链接数是65536)。查看监控发现是已分配的socket的数目比较高。能不能帮忙解答下。

    作者回复: 看一下是否存在大量的CLOSE_WAIT。之前在社区的确是碰到过这种海量CLOSE_WAIT撑爆了连接数的情形。如果是,目前除了无脑增加ulimit -n之外没有特别好的解决方案。

    共 4 条评论
    8
  • Mick
    2019-08-05
    老师麻烦帮我看下这个请求流程图我画的对不对?https://www.processon.com/view/link/5d481e6be4b07c4cf3031755

    作者回复: 我觉得挺好的:)

    共 3 条评论
    8
  • cricket1981
    2019-07-27
    双队列设计,分别存放数据类和控制类请求,每次先处理完所有控制类请求再处理数据类请求。
    共 3 条评论
    8
  • 梁胖
    2020-03-18
    "Kafka Broker 启动后,会在后台分别创建两套网络线程池和 IO 线程池的组合,它们分别处理数据类请求和控制类请求。至于所用的 Socket 端口,自然是使用不同的端口了,你需要提供不同的 listeners 配置,显式地指定哪套端口用于处理哪类请求" 胡老师您好,这句话我有个问题,请问:listeners配置中具体怎么显式地指定哪套端口用于处理哪类请求?

    作者回复: 当时在讲的时候社区还不是很完善,现在就比较肯定了。举个例子,你可以这样设定来控制control-plane请求由哪个监听器来控制: control.plane.listener.name=CONTROLLER listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL listeners = INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094

    6
  • Sunney
    2019-07-29
    老师您好,这两天做项目遇到一个问题想咨询一下,对于网络摄像头的视频流数据和抓拍到的照片数据,kafka应该如何传输呢?

    作者回复: 相同的方法,都要传输字节数组。你需要找到合适的方法把你的视频流数据或照片编码成字节序列。当然Kafka其实并不适合传输特别大的消息,因此你可以评估一下是否真的需求传视频本身?

    共 5 条评论
    6
  • 随心而至
    2021-01-14
    疑惑Kafka为什么没用已经实现了Reactor模式的netty网络应用框架,搜了下原因,一个是追求性能,一个是不想太多依赖。 https://www.quora.com/Why-did-Kafka-developers-prefer-to-implement-their-own-socket-server-instead-of-using-Netty-Does-that-help-with-performance-Does-Kafka-implement-such-features-already

    作者回复: 嗯嗯,确实有这方面的原因

    共 2 条评论
    5
  • 星之所在
    2020-05-14
    老师共享队列满了,那么这个请求会丢弃吧?

    作者回复: 默认是500的上限,如果满了会阻塞

    4
  • 2019-07-30
    我理解Acceptor是用来接收连接的(三次握手),连接成功后把读写请求的Socket提交到网络线程池,网络线程池中的线程通过Selector收到读请求后,从内核读取消息数据,然后再把待处理消息数据放入共享请求队列中。共享请求队列应该是多生产者多消费者模式(这里如何设计比较关键)。io线程池从共享请求队列中取出消息处理,处理完成再把响应提交到网络线程池中,由网络线程池发送至客户端。这里的共享请求队列为什么不直接使用io线程池自带的工作队列呢?另外控制类请求单独走不同线程池处理比较合理。
    展开
    4
  • 电光火石
    2019-07-27
    优先级队列方案,可以开两个队列,分别处理,前面的监听端口不需要重新构建,只是后面的处理线程不同即可。 另外,想问一下: 1. 为什么当时kafka做的时候,没有考虑使用netty作为通信框架? 2. 对IO这一块的处理比较感兴趣,老师可以介绍一下broker的入口类吗,想去看一下源码 谢谢了!
    展开

    作者回复: 1. Kafka社区当初主要是为了jar依赖的问题而选择不使用netty,转而使用Java NIO的 2. Broker入口类是kafka.server.KafkaServer.scala

    4
  • 灰色
    2021-08-24
    看了老师回答 “io线程池,异步处理消息,保证同一分区消息顺序一致性”的问题,感觉没有回答明白。 我这里在描述一下问题:一个producer向一个partition先后发了两条消息A和B,在共享队列里的顺序也是,先A后B,但是io线程有多个,这样消息A和消息B会被同时处理,而且很有可能,消息B先被处理完,保存到日志文件中,这让在日志中,就变成先B后A了,是不是就破坏了同一分区消息的有序性?
    展开
    共 7 条评论
    3
  • 一位不愿透漏姓名的足...
    2020-12-21
    https://my.oschina.net/u/3573545/blog/2221656 可以看下这篇文章,感觉和netty很相似~

    作者回复: 很不错的文章

    3
  • 小麦
    2020-10-11
    老师能讲一下具体怎么保证分区的顺序性的吗?我看评论里回复的: // 所以在clients端进行保证,通过特定的参数来控制请求的发送顺序 具体是怎么控制的?

    作者回复: 单个producer实例向单个分区下生产消息是保证有顺序的,如果在启用了retries之后设置max.in.flight.requests.per.connection = 1

    共 2 条评论
    3