25 | 消费者组重平衡全流程解析
25 | 消费者组重平衡全流程解析
讲述:胡夕
时长13:57大小12.77M
触发与通知
消费者组状态机
消费者端重平衡流程
Broker 端重平衡场景剖析
小结
开放讨论
赞 13
提建议
精选留言(110)
- LRocccccc置顶2019-08-02是我理解错了么?为什么场景一二三的图中的SyncGroup请求,都是等待Leader分配方案,leader不应该提供方案么?
作者回复: 嗯嗯,确实是。只是目前Kafka要求所有consumer都发送SyncGroup请求给Coordinator,因为分配方案只能通过SyncGroupResponse的方式获取。图中只是想表示这是一种机制,没有太区分consumer leader和其他consumer
共 7 条评论14 - rm -rf 😊ི置顶2019-07-31老师,在Broker 端重平衡场景剖析这个第一个图里面,既然协调者说了成员2是这个组的leader,为啥成员2的SyncGroup请求会是”等待leader分配“?这是笔误吗?后面几幅图好像也这样。。。
作者回复: 只是想表明这是统一的一种机制。。。源代码中肯定没有这样的话。。。
共 3 条评论3 - DFighting2019-09-09重平衡能不能参照JVM中的Minor gc和Major gc,将重平衡分为两步,在资源的角度讲集群进行分区,这里的资源可以理解为分区,因为后两种变化都是涉及到分区——新主题或已有主题的分区数量变化,对于现有的三种重平衡情况分别做如下处理: 1、新成员入区,在当前区内进行重平衡,不要影响其他的分区 2、资源分区中需要消费的分区队列数量发生的变化,也只是涉及到当前分区的重平衡。 这样设计的话就需要处理一个资源分区太空闲和太繁忙时的问题,我觉得可以参考m树的节点分裂和合并,这么做比m树更简单,因为它没有层级关系,只是资源分区的整合和划分而已,实现的时候还能兼顾到网络的局部特性,当然这只是初步想法,没有详细设计和验证,不知道有没有什么地方没有考虑周全,望老师能指点一二。展开
作者回复: 嗯嗯,非常赞的思路。现在社区正在对rebalance进行改革中有很多思想和你也有重合之处。
共 3 条评论44 - Frank2019-08-06这节课,干货很多,我现在有个疑问,重平衡时需要从消费者实例中选择一个leader,让leader来发起重平衡方案,那为啥不直接让协调者组件来处理呢?
作者回复: 客户端自己确定分配方案有很多好处。比如可以独立演进和上线,不依赖于服务器端
共 7 条评论35 - ban2019-08-01不会超过 session.timeout.ms 就能感知 老师,请问下,消费者已经崩溃了,不会发送心跳,协调者这时候怎么做到能到session.timeout.ms感知的。
作者回复: 每次consumer发送心跳时会顺带发送session timeout时间,这样Coordinator收到后会根据这个session timeout时间计算下次deadline时间,如果过了deadline还没有收到直接fail掉该consumer
共 4 条评论31 - Geek_08192020-01-12老师,有个问题文中说joingroup时等待所有消费者上报订阅信息,协调者通过什么判断所有消费者都已经上报了,或者说怎么知道有多少消费者客户端。如果上报信息后,消费者客户端崩溃了,这能等待下次心跳才能知道吗?
作者回复: join group时也是有一个总的超时时间的(取所有member最大的rebalance超时时间),靠这个作为判断是否进入到下一阶段的阈值。
共 7 条评论18 - 菜鸟和谐号2019-08-19不同的消费者消费不同的topic主题的领导者都是一个吗,我记得好像不同topic主题的协调者都不是一个啊,老师求解答
作者回复: 一个组对应一个Coordinator,与topic主题无关
17 - 极极2020-02-20老师,最后一张图,再平衡前,要求在规定时间内提交位移,这个规定时间如何设置?默认值是什么?
作者回复: rebalance timeout,默认是max.poll.intervals的值
共 5 条评论15 - wgcris2019-12-21老师,您好,请教个问题,最近使用consumer消费,发现consumer会出现commit offset failed,coordinator is not available 的错误,导致consumer卡住,无法消费。根据错误信息,是由于找不到groupcoordinator导致的,但coordinator不可以是在什么情况下发生的?一个猜测是由于consumer端发送心跳超时导致groupcoordinator认为该consumer死了,将该consumer剔除该group,导致该consumer不可用,一直卡住,不知道这种解释是否正确展开
作者回复: commit失败先看看是不是消息处理慢导致的吧。比如增加max.poll.interval.ms的值或降低max.poll.records的值试试看。Client端报出Coordinator不可用不一定表示Coordinator真的不可用
14 - 注定非凡2019-11-081 重平衡的通知 A :重平衡过程通过消息者端的心跳线程(Heartbeat Thread)通知到其他消费者实例。 B :Kafka Java消费者需要定期地发送心跳请求到Broker端的协调者,以表明它还存活着。 (1)在kafka 0.10.1.0版本之前,发送心跳请求是在消费者主线程完成的,也就是代码中调用KafkaConsumer.poll方法的那个线程。 这样做,消息处理逻辑也是在这个线程中完成的 ,因此,一旦消息处理消耗了过长的时间,心跳请求将无法及时发到协调者那里,导致协调者错判消费者已死。 (2)在此版本后,kafka社区引入了单独的心跳线程来专门执行心跳请求发送,避免这个问题。 C :重平衡的通知机制是通过心跳线程来完成的,当协调者决定开启新一轮重平衡后,他会将“REBALANCE_IN_PROGRESS”封装进心跳请求的响应中,发还给消费者实例。当消费者实例发现心跳响应中包含了”REBALANCE_IN_PROGRESS”,就能立即知道重平衡开始了。 D :消费者端的参数 heartbeat.interval.ms的真实用途是控制重平衡通知的频率。 2 消费者组状态机 Kafka设计了一套消费者组状态机(State Machine),帮助协调者完成整个重平衡流程。 A :kafka消费者组状态 (1)Empty:组内没有任何成员,但消费者组可能存在已提交的位移数据,而且这些位移尚未过期。 (2)Dead:组内没有任何成员,但组的元数据信息已经在协调者端被移除。协调者保存着当前向它注册过的所有组信息,所谓元数据就是类似于这些注册信息。 (3)PreparingRebalance:消费者组准备开启重平衡,此时所有成员都要重新请求加消费者组 (4)CompletingRebalance:消费者组下所有成员已经加入,各个成员正在等待分配方案。 (5)stable:消费者组的稳定状态。该状态表明重平衡已经完成,组内成员能够正常消费数据了。 B :Kafka定期自动删除过期位移的条件就是,组要处于Empty状态。如果消费者组停了很长时间(超过7天),那么Kafka很可能就把该组的位移数据删除了。 3 消费者端重平衡流程 A :重平衡的完整流程需要消费者端和协调者组件共同参与才能完成。 B :在消费者端,重平衡分为两个步骤: (1)加入组,对应请求:JoinGroup请求 (2)等待领导者消费者分配方案:SyncGroup请求 C :当组内成员加入组时,他会向协调者发送JoinGroup请求。在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。一旦收集了全部成员的JoinGroup请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者。 D :通常情况下,第一个发送JoinGroup 请求的成员自动成为领导者。这里的领导者是具体的消费者实例,它既不是副本,也不是协调者。领导者消费者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。 E :选出领导者之后,协调者会把消费者组订阅信息封装进JoinGroup请求的响应中,然后发给领导者,由领导统一做出分配方案后,进入下一步:发送SyncGroup请求。 F :领导者向协调者发送SyncGroup请求,将刚刚做出的分配方案发给协调者。值得注意的是,其他成员也会向协调者发送SyncGroup请求,只是请求体中并没有实际内容。这一步的目的是让协调者接收分配方案,然后统一以SyncGroup 响应的方式发给所有成员,这样组内成员就都知道自己该消费哪些分区了。 4 Broker端重平衡场景剖析 A :新成员入组 当协调者收到新的JoinGroup请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制他们开启新一轮的重平衡。 B :组成员主动离组 消费者实例所在线程或进程调用close()方法主动通知协调者他要退出。这个场景涉及第三类请求:LeaveGroup请求。协调者收到LeaveGroup请求后,依然会以心跳响应的方式通知其他成员。 C :组成员崩溃离组 崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。崩溃离组是被动的,协调者通常需要等待一段时间才能感知,这段时间一般是由消费者端参数session.timeout.ms控制的。 D :重平衡时协调者对组内成员提交位移的处理 正常情况下,每个组内成员都会定期汇报位移给协调者。当重平衡开启时,协调者会给予成员一段缓冲时间,要求每个成员必须在这段时间内快速地上报自己的位移信息,然后在开启正常JoinGroup/SyncGroup请求发送。展开14
- 明翼2019-07-30老师有两个问题请教下: 1)组状态在empty的时候,删除位移信息,这个时间间隔(文中7天)是否可以配置那,还是和普通的默认topic的消息存活时间一样吗? 2)这个设计我有点迷糊,都有协调者了为什么不让协调者统一做订阅分配那,让领导者做不是更麻烦吗?
作者回复: 1. 可以配置offsets.retention.minutes 2. 新版本consumer的一个改进就是把分区分配策略从server端移到consumer端来做。Client端代码演进的速度和容易程度要远胜于服务器端,算是一个优势吧
14 - 绿箭侠2019-10-21老师,看前面也有人问为啥不把订阅分配方案移到协调者上统一去做? 您说Client端代码演进速度 和 容易程度远胜于服务器端,是一个优势。 这里还是没明白,为什么Client端代码演进速度 和 容易程度更好?!!
作者回复: 这只是其中的一个可能的原因。client端代码更新的难度要远小于broker端。如果是broker代码更新,你需要rolling upgrade所有集群中的broker,在生产环境中并不一定有这样的时间窗口
10 - Li Shunduo2019-07-30请问当重平衡开启时,协调者会给予提交位移的缓冲时间是多少?如果超过了会拒绝提交的位移吗?
作者回复: 没有具体的限制。反正如果consumer提交的位移请求到broker端时整个group已经从Preparing进化到Completing了,那么就晚了,broker会拒绝这个提交请求
共 2 条评论9 - 快跑2019-08-01老师你好, 订阅主题数量发生变化是指什么? 怎么才能触发这个场景的发生?
作者回复: consumer.subscribe(Pattern.compile("test.*"))就可以。这样当你新建了一个test开头的topic,订阅信息就发生变化了,需要重新rebalance
共 3 条评论8 - 对与错2020-07-29请问订阅主题发送变化之后触发reblance的流程是什么样子的?
作者回复: consumer会定期更新元数据,如果发现了新增的订阅分区,会主动触发rebalance
7 - ATSS码农哥2020-04-20老师 我有一个问题 您说consumer group里面的consumer leader会提供分配方案 所以这个分配方案是根据coordinator通过joinGroup收集到的所有<topic, partition> 去做一个统一的分配是么? 在结束分配以后 每个consumer都有可能负责和之前完全不一样的分区?
作者回复: 是的,rebalance结束之后非常可能被分配到和之前完全不一样的分区。社区后面加入了一个StickyAssignor,力图最大程度地保证之前分区分配的粘性,可以试试。具体方法是设置partition.assignment.strategy=org.apache.kafka.clients.consumer.StickyAssignor
7 - 陈国林2020-01-19老师好,我说下自己的愚见。为了 Rebalance 过程避免 STW,是否可以开启另外一个线程用于 ”预分配“,预分配的算法最大化的保证 consumer 实例消费的状态不变,确保 Rebalance 过程只需要变更部分 consumer 实例。预分配成功后,再走真正的 Rebalance 流程,不知是否可行
作者回复: 短时间内我不确定这个方案是否可行,大体上看是一个很好的想法。如果可以细化的话,不妨提一个KIP:)
7 - 兔2🐰🍃2019-10-24作者回复: 每次consumer发送心跳时会顺带发送session timeout时间,这样Coordinator收到后会根据这个session timeout时间计算下次deadline时间,如果过了deadline还没有收到直接fail掉该consumer 老师说“Kafka 一般不会超过 session.timeout.ms 就能感知到这个崩溃”,感觉上面回复中的 session timeout 要小于这个消费者端参数,那么心跳请求里的session timeout 是什么计算的?展开
作者回复: 这块需要更正一次,不是在心跳时携带的session timeout,这个时间在加入组的时候就已经发给Coordinator了。
7 - roderickyu2019-08-26老师您好,收到重平衡通知后,如果某个consumer提交位移超时了,那么会造成重复消费吧?是不是只能在应用层去重?
作者回复: 有可能重复。目前用业务去重更保险些
7 - Mr.Brooks2020-05-20消费者组创建的过程中,协调者何时判断所有消费者已加入,还是说它在收到第一个joingroup请求后等一段时间?
作者回复: 嗯,是的。你的理解是对的~
共 2 条评论6