27 | 关于高水位和Leader Epoch的讨论
27 | 关于高水位和Leader Epoch的讨论
讲述:胡夕
时长16:55大小15.49M
什么是高水位?
高水位的作用
高水位更新机制
副本同步机制解析
Leader Epoch 登场
小结
开放讨论
赞 13
提建议
精选留言(94)
- 你好旅行者2019-08-10老师列举了数据丢失的场景,我补充一个数据丢失的场景吧: 假设集群中有两台Broker,Leader为A,Follower为B。A中有两条消息m1和m2,他的HW为1,LEO为2;B中有一条消息m1,LEO和HW都为1.假设A和B同时挂掉,然后B先醒来,成为了Leader(假设此时的min.insync.replicas参数配置为1)。然后B中写入一条消息m3,并且将LEO和HW都更新为2.然后A醒过来了,向B发送FetchrRequest,B发现A的LEO和自己的一样,都是2,就让A也更新自己的HW为2。但是其实,虽然大家的消息都是2条,可是消息的内容是不一致的。一个是(m1,m2),一个是(m1,m3)。 这个问题也是通过引入leader epoch机制来解决的。 现在是引入了leader epoch之后的情况:B恢复过来,成为了Leader,之后B中写入消息m3,并且将自己的LEO和HW更新为2,注意这个时候LeaderEpoch已经从0增加到1了。 紧接着A也恢复过来成为Follower并向B发送一个OffsetForLeaderEpochRequest请求,这个时候A的LeaderEpoch为0。B根据0这个LeaderEpoch查询到对应的offset为1并返回给A,那么A就要对日志进行截断,删除m2这条消息。然后用FetchRequest从B中同步m3这条消息。这样就解决了数据不一致的问题。展开共 10 条评论58
- QQ怪2019-08-03这篇文章有点深度了,看了几遍才看懂共 7 条评论32
- 钱2019-08-18今天的课程很棒,知识密度比较大,小结一下 1:啥是高水位? 水位,我的理解就是水平面当前的位置,可以表示水的深度。在kafka中水位用于表示消息在分区中的位移或位置,高水位用于表示已提交的消息的分界线的位置,在高水位这个位置之前的消息都是已提交的,在高水位这个位置之后的消息都是未提交的。所以,高水位可以看作是已提交消息和未提交消息之间的分割线,如果把分区比喻为一个竖起来的水容器的话,这个表示就更明显了,在高水位之下的消息都是已提交的,在高水位之上的消息都是未提交的。 高水位的英文是High Watermark ,所以其英文缩写为HW。 值得注意的是,Kafka 中也有低水位(Low Watermark,英文缩写为LW),它是与 Kafka 删除消息相关联的概念。 再加一个概念,LEO——Log End Offset 的缩写——意思是日志末端位移,它表示副本写入下一条消息的位移值——既分区中待写入消息的位置。这个位置和高水位之间的位置包括高水位的那个位置,就是所有未提交消息的全部位置所在啦——未提交的消息是不能被消费者消费的。所以,同一个副本对象,其高水位值不会大于 LEO 值。 高水位和 LEO 是副本对象的两个重要属性。Kafka 所有副本都有对应的高水位和 LEO 值,而不仅仅是 Leader 副本。只不过 Leader 副本比较特殊,Kafka 使用 Leader 副本的高水位来定义所在分区的高水位。换句话说,分区的高水位就是其 Leader 副本的高水位。 2:高水位有啥用? 2-1:定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的——已提交的消息是可以被消费者消费的。 2-2:帮助 Kafka 完成副本同步——明确那些消息已提交那些未提交,才好进行消息的同步。 3:高水位怎么管理? 这个不好简单的描述,牢记高水位的含义,有助于理解更新高水的时机以及具体步骤。 高水位——用于界定分区中已提交和未提交的消息。 4:高水有舍缺陷? Leader 副本高水位更新和 Follower 副本高水位更新在时间上是存在错配的。这种错配是很多“数据丢失”或“数据不一致”问题的根源。 5:啥是 leader epoch? 可以大致认为就是leader的版本。 它由两部分数据组成。 5-1:Epoch。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。 5-2:起始位移(Start Offset)。Leader 副本在该 Epoch 值上写入的首条消息的位移。 6:leader epoch 有啥用? 通过 Leader Epoch 机制,Kafka 规避了因为Leader 副本高水位更新和 Follower 副本高水位更新在时间上是存在错配,而引起的很多“数据丢失”或“数据不一致”的问题。 7:leader epoch 怎么管理? 需要再看看,还不能简单描述出来。展开27
- 朱东旭2019-11-02胡老师您好,在您讲的leader epoch机制案例中,在我看来最关键的操作是broker重启后先向leader确认leo,而不是直接基于自己的高水位截断数据,来防止数据不一致。。可是有无leader epoch都可以做这个操作呀,我看不出leader epoch必要性在哪。。
作者回复: epoch还有其他的作用,比如执行基本的fencing逻辑等
共 3 条评论14 - 常超2019-08-08前面有几个同学提过了,请老师再看一下。 >与 Leader 副本保持同步的两个判断条件。 >1. 该远程 Follower 副本在 ISR 中。 >2. ... >如果 Kafka 只判断第 1 个条件的话,就可能出现某些副本具备了“进入 ISR”的资格,但却尚未进入到 ISR 中的情况。此时,分区高水位值就可能超过 ISR 中副本 LEO,而高水位 > LEO 的情形是不被允许的。 应该改成“如果 Kafka 只判断第 2 个条件的话,...” 吧? 按照现在的说法,上面那句话可以扩展成,如果只判断远程Follower副本是否在ISR中的话,就可能出现某些副本具备了“进入 ISR”的资格,但却尚未进入到 ISR 中的情况。此时,分区高水位值就可能超过 ISR 中副本 LEO,而高水位 > LEO 的情形是不被允许的。 这样是说不通的吧。 换个问法,比如,条件1有副本a,b, 条件2有副本b,c(其中c满足进入1的条件,但还没进入1),老师是想说,“只判断1,a会被误判为同步状态”,还是“只判断2,c会被误判为同步状态”呢?展开共 8 条评论11
- 李先生2020-04-14胡哥,有两个问题: 1:为什么broker重启要进行日志截断,触发日志截断的前提是什么?目的是什么? 2:acks=all,是代表同步到所有isr中broker的pagecache中还是磁盘?min.insync.replicas是配合acks=all来使用的,是一个保证消息可靠性的配置,比如设置为2,是代表在isr中至少两个broker上写入消息,这个写入是写入pagecache中还是磁盘中?如果都是写入pagecache中,kafka是有异步线程来定时从pagecache中拉消息写入磁盘吗?展开
作者回复: 1. broker崩溃前可能写入了部分不完整的消息。这部分数据显然不能算做成功提交,因此在重启回来后要执行截断操作,将底层日志调整回到合法的状态上。 2. 你可以认为都是pagecache。pagecache落盘完全由OS来完成,不由Kafka控制。
共 2 条评论9 - 知易2019-08-26文中老师举例说明数据丢失场景,其中有一处疑惑。 原文。。“当执行完截断操作后,副本 B 开始从 A 拉取消息,执行正常的消息同步。如果就在这个节骨眼上,副本 A 所在的 Broker 宕机了,那么 Kafka 就别无选择,只能让副本 B 成为新的 Leader,此时,当 A 回来后,需要执行相同的日志截断操作,即将高水位调整为与 B 相同的值,也就是 1。这样操作之后,位移值为 1 的那条消息就从这两个副本中被永远地抹掉了。这就是这张图要展示的数据丢失场景。” 其中,A宕机前其高水位为2,此时回来进行日志截断不应该还是2么,为啥要调整为与leaderB一样的水位值?前面B宕机回来的时候,进行日志截断也还是保持其宕机前的值1,并没有调整为与leaderA一样的水位值呢? 这里不是没有理解到,请老师解惑。感谢。展开共 2 条评论9
- td9011052020-01-19老师您好,我怎么感觉只需要在副本拉取Leader的LOG就不会产生日志截断的问题了,感觉不需要Leader Epoch?
作者回复: 日志截断主要是因为follower必须要与leader保持一致,而一旦某个“落后”的副本成为leader,其他“领先”的follower必须与其保持一致,必须truncate掉自己多余的消息。至于如何在这个过程中保持副本间的一致,社区之前使用高水位机制,但发现有一些固有的缺陷,进而开发了leader epoch。如果你能提出更简单的算法,欢迎写下来:)
共 6 条评论8 - faunjoe2020-05-10多个broker 中的leader epoch 他们是版本号是怎么保持累加的
作者回复: Kafka缓存了LeaderEpochCache来保证
7 - Johnson2020-04-13老师您好,有个疑问,leader epoch怎么做到像hw里的数据可见性的,比如hw可以保证消费端只能消费hw之前提交的消息,leader epoch如何保证这点,谢谢!
作者回复: leader epoch做不到这点。leader epoch只是保证在执行日志截断时不会出现因leader/follower副本接连宕机导致的不一致性。除此之外的功能依然由HW提供
共 2 条评论7 - 😈😈😈😈😈2019-10-22这个是我理解的HW和LEO更新的机制步骤,有错误的话请大神指明下,非常感谢 更新对象 更新时机 Broker1上Follower副本 Follwer会从Leader副本不停的拉取数据,但是Leader副本现在的没有数据。所以Leader副本和Follower副本的高水位值和LEO值都是0 Broker0上的Leader副本 生产者向Leader副本中写入一条数据,此时LEO值是1,HW值是0。也就是说位移为0的位置上已经有数据了 Broker1上Follower副本 由于Leader副本有了数据,所以Follower可以获取到数据写入到自己的日志中,且标记LEO值为1,此时在Followe位移值为0的位置上也有了数据,所以此时Follower的HW=0,LEO=1 Broker1上Follower副本 获取到数据之后,再次向Leader副本拉数据,这次请求拉取的数据是位移值1上的数据 Broker0上的远程副本 Leader收到Follower的拉取请求后,发现Follower要拉取的数据是在位移值为1的位置上的数据,此时会更新远程副本的LEO值为1。所以所有的远程副本的LEO等于各自对应的Follower副本的LEO值 Brober0上的Leader副本 Broker0上的远程副本的LEO已经更新为1了。所以开始更新Leader副本的HW值。HW=max{HW,min(LEO1,LEO2,LEO3......LEON)},更新HW值为1,之后会发送Follower副本请求的数据(如果有数据的话,没有数据的话只发送HW值)并一起发送HW值 Broker1上Follower副本 Follwer副本收到Leader返回的数据和HW值(如果Leader返回了数据那么LEO就是2,没有数据的话LEO还是1),用HW值和自己的LEO值比较选择较小作为自己的HW值并更新HW值为1(如果俩个值相等的话HW=LEO) 一次副本间的同步过程完成展开
作者回复: 挺好的,没有什么意见:)
共 2 条评论7 - thomas2020-04-26倘若此时副本 B 所在的 Broker 宕机,当它重启回来后,副本 B 会执行日志截断操作,将 LEO 值调整为之前的高水位值,也就是 1。 -------------------------------------------------------------> 老师,请问为什么要将LE0的值设置为HW的值。LEO的值是消息写入磁盘后才被更新的,也就是数据已经落地。重启后继续用LEO的值会有什么问题吗
作者回复: 必须要调整到水位值,因为即使消息被写入到磁盘上了,不代表这条消息就提交成功了。未成功提交的消息即使写入了磁盘也要做截断
共 2 条评论6 - 常超2019-08-07请问老师,与 Leader 副本保持同步的两个判断条件,是OR还是AND的关系?
作者回复: AND
共 3 条评论6 - Geek_08192020-01-21胡老师有个疑问:生产者同步发送消息时指定同步到所有副本,生产者是等待所有副本的LEO都写入成功才返回吗?如果是这样Follower副本从Leader上拉取LEO是有时间间隔的,这样生产者都在这里等待很久吗?还是其他方式的交互?
作者回复: 配置acks=-1的生产者会等待ISR中所有副本都同步了该消息才会认为消息成功提交。确实,有些情况下生产者会等待一段时间,这通常是线上环境中producer延迟的主要诱因。至于多久则因具体场景而定了~
共 3 条评论5 - 我来也2019-08-031.该远程 Follower 副本在 ISR 中。 如果 Kafka 只判断第 1 个条件的话,就可能出现某些副本具备了“进入 ISR”的资格,但却尚未进入到 ISR 中的情况。 ———————— 这里是不是把条件的编号写反了?展开
作者回复: 没写反啊?就是想说只靠第一个条件不充分
共 4 条评论4 - 店小二#22020-07-09引用一下@thomas与老师交流。 thomas iii. 更新 currentHW = max{currentHW, min(LEO-1, LEO-2, ……,LEO-n)} -------------------------------------------------------------------------------< 老师, LEO-1,LEO-2...LEO-n 都是在ISR集合中的,我认为 currentHW= min(LEO-1, LEO-2, ……,LEO-n就可以,请问在什么场景下currentHW 会大于 min(LEO-1, LEO-2, ……,LEO-n) 作者回复: 有些落后很多的follower可能出现这种情况 ==================================================== 老师的解答的场景我理解了。但是不明白的是,该场景下,恢复过来的follower副本replica.lag.time.max.ms < 10s,且还未到ISR中时,此时的分区高水位不是也大于LEO了么?展开
作者回复: 你说的是有一定道理的。我只能给出这样的答案:分布式中的交互很多东西其实是不确定的,我们很难穷尽所有的可能性。因此我觉得这里的写法有防御性编程的味道。你可以试着删掉然后改用你的写法跑一遍测试用例看看。也许是一个不错的优化点。
3 - 亚洲舞王.尼古拉斯赵...2019-11-06我还奇怪为什么老师讲的和Apache kafka实战这部分内容差不多,还以为是抄袭,后来一看,原来老师就是我看的这本书的作者,😂
作者回复: 在所有需要回复的人当中,你的名字是我最喜欢的,没有之一:)
共 3 条评论3 - 信信2019-08-04原文中“如果 Kafka 只判断第 1 个条件的话”--这里应该是:第2个条件?评论区其他人也有提到 对这块的个人理解: 两个条件之间的关系是与不是或 这里想表达的应该是--这个即将进入isr的副本的LEO值比分区高水位小,但满足条件2; 文中对条件2的描述好像有点歧义,以下是网上找的一段: 假设replica.lag.max.messages设置为4,表明只要follower落后leader不超过3,就不会从同步副本列表中移除。replica.lag.time.max设置为500 ms,表明只要follower向leader发送请求时间间隔不超过500 ms,就不会被标记为死亡,也不会从同步副本列中移除。展开
作者回复: replica.lag.max.messages已经被移除了,不要看这篇了。你可以看看我之前写的这篇:Kafka副本管理—— 为何去掉replica.lag.max.messages参数(https://www.cnblogs.com/huxi2b/p/5903354.html)
共 2 条评论3 - lmtoo2019-08-03“当获知到 Leader LEO=2 后,B 发现该 LEO值不比它自己的 LEO 值小,而且缓存中也没有保存任何起始位移值 > 2 的 Epoch 条目”是什么意思? 如果follower B重启回来之后去取Leader A的LEO,但是此时Leader A已经挂了,这套机制不就玩不转了吗?
作者回复: 这套机制防止的是根据HW做日志截断出现数据不一致,不能防止任何情况下副本都正常工作
共 3 条评论3 - J.Smile2020-07-06老师好,有个问题请教一下: 日志段文件在删除时,先被标记为deleted,然后延迟默认1分钟后删除,如果此时我的客户端要访问这个日志文件,那么这个被标记为deleted的日志文件还会被删除吗?物理文件和内存中的文件分别是怎样的?也就是说会不会导致openfiles由于客户端的访问而不被释放掉?
作者回复: 会。加了delete后缀的日志文件不会被访问到了
2