21 | Java 消费者是如何管理TCP连接的?
21 | Java 消费者是如何管理TCP连接的?
讲述:胡夕
时长11:42大小10.71M
何时创建 TCP 连接?
创建多少个 TCP 连接?
何时关闭 TCP 连接?
可能的问题
小结
开放讨论
赞 11
提建议
精选留言(35)
- 常超2019-07-20整个生命周期里会建立4个连接,进入稳定的消费过程后,同时保持3个连接,以下是详细。 第一类连接:确定协调者和获取集群元数据。 一个,初期的时候建立,当第三类连接建立起来之后,这个连接会被关闭。 第二类连接:连接协调者,令其执行组成员管理操作。 一个 第三类连接:执行实际的消息获取。 两个分别会跟两台broker机器建立一个连接,总共两个TCP连接,同一个broker机器的不同分区可以复用一个socket。展开共 2 条评论43
- 注定非凡2019-11-061,何时创建 A :消费者和生产者不同,在创建KafkaConsumer实例时不会创建任何TCP连接。 原因:是因为生产者入口类KafkaProducer在构建实例时,会在后台启动一个Sender线程,这个线程是负责Socket连接创建的。 B :TCP连接是在调用KafkaConsumer.poll方法时被创建。在poll方法内部有3个时机创建TCP连接 (1)发起findCoordinator请求时创建 Coordinator(协调者)消费者端主键,驻留在Broker端的内存中,负责消费者组的组成员管理和各个消费者的位移提交管理。 当消费者程序首次启动调用poll方法时,它需要向Kafka集群发送一个名为FindCoordinator的请求,确认哪个Broker是管理它的协调者。 (2)连接协调者时 Broker处理了消费者发来的FindCoordinator请求后,返回响应显式的告诉消费者哪个Broker是真正的协调者。 当消费者知晓真正的协调者后,会创建连向该Broker的socket连接。 只有成功连入协调者,协调者才能开启正常的组协调操作。 (3)消费数据时 消费者会为每个要消费的分区创建与该分区领导者副本所在的Broker连接的TCP. 2 创建多少 消费者程序会创建3类TCP连接: (1) :确定协调者和获取集群元数据 (2):连接协调者,令其执行组成员管理操作 (3) :执行实际的消息获取 3 何时关闭TCP连接 A :和生产者相似,消费者关闭Socket也分为主动关闭和Kafka自动关闭。 B :主动关闭指通过KafkaConsumer.close()方法,或者执行kill命令,显示地调用消费者API的方法去关闭消费者。 C :自动关闭指消费者端参数connection.max.idle.ms控制的,默认为9分钟,即如果某个socket连接上连续9分钟都没有任何请求通过,那么消费者会强行杀死这个连接。 D :若消费者程序中使用了循环的方式来调用poll方法消息消息,以上的请求都会被定期的发送到Broker,所以这些socket连接上总是能保证有请求在发送,从而实现“长连接”的效果。 E :当第三类TCP连接成功创建后,消费者程序就会废弃第一类TCP连接,之后在定期请求元数据时,会改为使用第三类TCP连接。对于一个运行了一段时间的消费者程序来讲,只会有后面两种的TCP连接。展开23
- AAA_叶子2019-12-18消费者tcp连接一旦断开,就会导致rebalance,实际开发过程中,是不是需要尽量保证长连接的模式?
作者回复: 嗯,如果就是要长时间的消费,维持一个长连接是不错的选择
共 3 条评论22 - taj39912019-07-22老师同一个消费组的客户端都只会连接到一个协调者吗?
作者回复: 是的。每个group都有一个与之对应的coordinator
18 - Williamzhang2019-07-22我觉得作者可以跟学员的留言互动,然后每期课后思考可以在下期中贴出答案及分析,其实留言讨论也是一个非常让人有收获的地方共 2 条评论12
- Geek_ab3d9a2020-03-17老师您好,请问k8s这样的容器平台,适合部署kafka的消费者吗?如果容器平台起了二个一模一样的消费者,对kafka来说会不会不知道自己通信的哪一个消费者? kafka通过什么来判断不同的客户端?
作者回复: 每个客户端至少主机名和端口是不一样的,我是指在TCP连接这个层面。另外对于producer而言,其实Kafka也不用区分它们,反正知道它们都再向集群发送消息就行了。对于consumer而言,主要还是看group.id的设置以确定它们是否在同一个group。最后如果是你想区分客户端,那么可以设置不同的client.id
7 - 天天向上2019-09-19元数据不包含协调者信息吗?为啥还要再请求一次协调者信息 什么设计思路?
作者回复: 不包括,因为你请求元数据的broker可能不是Coordinator,没有Coordinator的信息
6 - 信信2019-07-24一共建过四次连接。若connection.max.idle.ms 不为-1,最终会断开第一次连的ID为-1的连接。6
- October2019-07-21总共创建4个连接,最终保持3个连接: 确定消费者所属的消费组对应的GroupCoordinator和获取集群的metadata时创建一个TCP连接,由于此时的node id = -1,所以该连接无法重用。 连接GroupCoordinator时,创建第二个TCP连接,node id值为Integer.MAX_VALUE-id 消费者会与每个分区的leader创建一个TCP连接来消费数据,node id为broker.id,由于kafka只是用id这一维度来表征Socket连接信息,因此如果多个分区的leader在同一个broker上时,会共用一个TCP连接,由于分区数大于broker的数量,所以会创建两个TCP连接消费数据。展开6
- 真锅2020-04-25意思是即便知道了协调者在node 2上,还是会依然用2147483645这个id的TCP连接去跟协调者通信吗。
作者回复: 这个数字不是固定的,而是用MAX - broker ID算出来的。对Coordinator的连接来说,是的! 它的ID就是这个算法
4 - Eco2020-01-19应该是3个tcp连接,第一个id=-1的没什么争议,然后是连接协调者的,但是broker,5个分区的leader肯定会分布到这两台broker上,那么第三类tcp就是2个tcp连接,但是这2个中完全可以有一个是直接使用连接协调者的那个tcp连接吧,但老师好像说过连接协调者的连接会和传输数据的分开,id的计算都不相同,好吧,那就4个tcp连接吧。可这里真的不能复用吗?我觉得可以。
作者回复: 目前与Coordinator和普通数据交互的TCP连接的确是分开的,你要说是否能复用,我觉得当然可以复用,只不过现在没有这么设计:)
共 2 条评论4 - 小木匠2019-07-22“负载是如何评估的呢?其实很简单,就是看消费者连接的所有 Broker 中,谁的待发送请求最少。” 老师这个没太明白,这时候消费者不是还没连接么?那这部分信息是从哪获取到的呢?消费者本地吗?
作者回复: 刚开始的时候当然就类似于随机选broker了,但后面慢慢积累了一些数据之后这个小优化还是会起一些作用的
4 - yes2020-06-20老师我有个疑问,consumer在FindCoordinator的时候会选择负载最小的broker进行连接,文章说看消费者连接的所有 Broker 中,谁的待发送请求最少。请问consumer如何得知这个消息?如果它想知道这个消息,不就得先和“某个东西”建立连接了?
作者回复: 它首先倾向于使用已建立连接的节点。如果已建立连接的节点都在使用中,可能会创建新的TCP连接
共 2 条评论3 - Treagzhao2020-05-19老师,“消费者程序会向集群中当前负载最小的那台 Broker 发送请求”,消费者怎么单方面知道服务器待发送的消息数量呢?而且应该只有leader才会实际发送消息吧,follower待发送的都是0,消费者怎么在建立连接之前就知道服务器的角色呢?
作者回复: 是这样判断的,就是看消费者与broker的TCP连接上的待处理请求的个数
2 - 臧萌2019-08-29我们要设计一个消息系统。有两个选择,更好的一种是每种不同schema的消息发一个topic。但是有一种担心是consumer会为每个topic建立一个连接,造成连接数太多。请问胡老师,kafka client的consumer是每个集群固定数目的tcp连接,还是和topic数目相关?
作者回复: 和它要订阅的topic分区数以及这些分区在broker上的散列情况有关。比如你订阅了100个分区,但这个100个分区的leader副本都在一个broker上,那么长期来看consumer也就只和这1个broker建立连接;相反如果这100分区散列在100个broker上,那么长期来看consumer会和100个broker维持长连接
共 2 条评论2 - rm -rf 😊ི2019-07-21我认为也是3个连接,第一个是查找Coordinator的,这个会在后面断开。然后5个partition会分布在2个broker上,那么客户端最多也就连接2次就能消费所有partition了,因此是连接3个,最后保持2个。2
- 艺超(鲁鸣)2020-07-29老师好,请教一个问题,现在对于producer和consumer都介绍了维持tcp连接的情况,那么对于kafka集群 broker来说,这么多的tcp连接,是如何管理的呢?
作者回复: 其实也没什么管理。如果一定要说管理,通过KafkaChannel对象来管理。
共 2 条评论1 - 举个荔枝2019-10-02老师,想问下这里是不是笔误。 还记得消费者端有个组件叫Coordinator吗?协调者应该是位于Broker端的吧?
作者回复: 嗯嗯,其实这里的消费者端指的是广义的消费者,我是想说在Kafka消费者的概念中有Coordinator。当然如你所说Coordinator是Broker端的组件没错。这里的确有不严谨的地方,多谢指出:)
1 - 吴宇晨2019-07-22一个获取元数据的连接(之后会断开)+两个连接分区leader的连接+一个连接协调者的连接1
- 明翼2019-07-20连接有三个阶段:首先获取协调者连接同时也获取元数据信息,这个连接后面会关闭;连接协调者执行,等待分配分区,组协调等,这需要一个连接;后面真正消费五个分区两个broker最多就两个连接,分区大于broker所以一定是两个,因为第一类连接没有id,所以无法重用,会在第三类开启连接后关闭,所以开始四个连接最终保持三个连接1