极客时间已完结课程限时免费阅读

加餐 | ZAB协议(三):如何处理读写请求?

加餐 | ZAB协议(三):如何处理读写请求?-极客时间

加餐 | ZAB协议(三):如何处理读写请求?

讲述:于航

时长12:10大小11.14M

你好,我是韩健!
你应该有这样的体会,如果你想了解一个网络服务,执行的第一个功能肯定是写操作,然后才执行读操作。比如,你要了解 ZooKeeper,那么肯定会在 zkCli.sh 命令行中执行写操作(比如“create /geekbang 123”)写入数据,然后再是读操作(比如“get /geekbang”)查询数据。这样一来,你才会直观地理解 ZooKeeper 是如何使用的了。
在我看来,任何网络服务最重要的功能就是处理读写请求,因为我们访问网络服务本质上都是在执行读写操作,ZooKeeper 也不例外。而且对 ZooKeeper 而言,这些功能更为重要,因为在 ZooKeeper 中,如何处理写请求,关乎着操作的顺序性,而操作的顺序性会影响节点的创建;如何处理读请求,关乎着一致性,它们又影响着客户端是否会读到旧数据。
接下来,我会从 ZooKeeper 系统的角度,全面地分析整个读写请求的流程,帮助你更加全面、透彻地理解读写请求背后的原理。
你肯定知道,在 ZooKeeper 中,写请求是必须在领导者上处理,如果跟随者接收到了写请求,它需要将写请求转发给领导者,当写请求对应的提案被复制到大多数节点上时,领导者会提交提案,并通知跟随者提交提案。而读请求可以在任何节点上处理,也就是说,ZooKeeper 实现的是最终一致性。
对你来说,理解了如何处理读写请求,不仅能理解读写这个最重要功能的核心原理,还能更好地理解 ZooKeeper 的性能和一致性。这样一来,当你在实际场景中安装部署 ZooKeeper 的时候,就能游刃有余地做资源规划了。比如,如果读请求比较多,你可以增加节点,配置 5 节点集群,而不是常见的 3 节点集群。
话不多说,我们进入今天的内容,一起探究 ZooKeeper 处理读写请求的背后原理和代码实现。

ZooKeeper 处理读写请求的原理

其实,我在15 讲演示“如何实现操作顺序性”时,就已经介绍了 ZooKeeper 是如何处理读写请求的了。所以在这里我就不啰嗦了,只是在此基础上,再补充几点。
首先,在 ZooKeeper 中,与领导者“失联”的节点,是不能处理读写请求的。比如,如果一个跟随者与领导者的连接发生了读超时,设置了自己的状态为 LOOKING,那么此时它既不能转发写请求给领导者处理,也不能处理读请求,只有当它“找到”领导者后,才能处理读写请求。
举个例子:当发生分区故障了,C 与 A(领导者)、B 网络不通了,那么 C 将设置自己的状态为 LOOKING,此时在 C 节点上既不能执行读操作,也不能执行写操作。
其次,当大多数节点进入到广播阶段的时候,领导者才能提交提案,因为提案提交,需要来自大多数节点的确认。
最后,写请求只能在领导者节点上处理,所以 ZooKeeper 集群写性能约等于单机。而读请求是可以在所有的节点上处理的,所以,读性能是能水平扩展的。也就是说,你可以通过分集群的方式来突破写性能的限制,并通过增加更多节点,来扩展集群的读性能。
熟悉了 ZooKeeper 处理读写请求的过程和原理后,相信你应该好奇这些功能在 ZooKeeper 代码中是如何实现的呢?

ZooKeeper 代码是如何实现读写操作的呢?

如何实现写操作?

我先来说一说写操作,在 ZooKeeper 代码中,处理写请求的核心流程就像下图一样(为了帮你更好的理解这部分内容,我来演示一下复杂的情况,也就是跟随者接收到写请求的情况)。
接下来,咱们一起走一遍核心代码的流程,加深一下印象。
1. 跟随者在 FollowerRequestProcessor.processRequest() 中接收到写请求。具体来说,写请求是系统在 ZooKeeperServer.submitRequestNow() 中发给跟随者的。
firstProcessor.processRequest(si);
而 firstProcessor,是在 FollowerZooKeeperServer.setupRequestProcessors() 中创建的。
protected void setupRequestProcessors() {
// 创建finalProcessor,提交提案或响应查询
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
// 创建commitProcessor,处理提案提交或读请求
commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
// 创建firstProcessor,接收发给跟随者的请求
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
// 创建syncProcessor,将提案持久化存储,并返回确认响应给领导者
syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
syncProcessor.start();
}
需要你注意的是,跟随者节点和领导者节点的 firstProcessor 是不同的,这样当 firstProcessor 在 ZooKeeperServer.submitRequestNow() 中被调用时,就分别进入了跟随者和领导者的代码流程。另外,setupRequestProcessors() 创建了 2 条处理链,就像下图的样子。
其中,处理链 1 是核心处理链,最终实现了提案提交和读请求对应的数据响应。处理链 2 实现了提案持久化存储,并返回确认响应给领导者。
2. 跟随者在 FollowerRequestProcessor.run() 中将写请求转发给领导者。
// 调用learner.request()将请求发送给领导者
zks.getFollower().request(request);
3. 领导者在 LeaderRequestProcessor.processRequest() 中接收写请求,并最终调用 pRequest() 创建事务(也就是提案),并持久化存储。
// 创建事务
pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
......
// 分配事务标识符
request.zxid = zks.getZxid();
// 调用ProposalRequestProcessor.processRequest()处理写请求,并将事务持久化存储
nextProcessor.processRequest(request);
在这里,需要你注意的是,写请求也是在 ZooKeeperServer.submitRequestNow() 中发给领导者的。
firstProcessor.processRequest(si);
而 firstProcessor,是在 LeaderZooKeeperServer.setupRequestProcessors() 中创建的。
protected void setupRequestProcessors() {
// 创建finalProcessor,最终提交提案和响应查询
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
// 创建toBeAppliedProcessor,存储可提交的提案,并在提交提案后,从toBeApplied队列移除已提交的
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
// 创建commitProcessor,处理提案提交或读请求
commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
commitProcessor.start();
// 创建proposalProcessor,按照顺序广播提案给跟随者
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
proposalProcessor.initialize();
// 创建prepRequestProcessor,根据请求创建提案
prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
prepRequestProcessor.start();
// 创建firstProcessor,接收发给领导者的请求
firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
......
}
需要你注意的是,与跟随者类似,setupRequestProcessors() 给领导者也创建了 2 条处理链(其中处理链 2 是在创建 proposalRequestProcessor 时创建的)。
其中,处理链 1 是核心处理链,最终实现了写请求处理(创建提案、广播提案、提交提案)和读请求对应的数据响应。处理链 2 实现了提案持久化存储,并返回确认响应给领导者自己。
4. 领导者在 ProposalRequestProcessor.processRequest() 中,调用 propose() 将提案广播给集群所有节点。
zks.getLeader().propose(request);
5. 跟随者在 Follower.processPacket() 中接收到提案,持久化存储,并返回确认响应给领导者。
// 接收、持久化、返回确认响应给领导者
fzk.logRequest(hdr, txn, digest);
6. 当领导者接收到大多数节点的确认响应(Leader.processAck())后,最终在 CommitProcessor.tryToCommit() 提交提案,并广播 COMMIT 消息给跟随者。
// 通知跟随者提交
commit(zxid);
// 自己提交
zk.commitProcessor.commit(p.request);
7. 当跟随者接收到 COMMIT 消息后,在 FollowerZooKeeperServer.commit() 中提交提案,如果最初的写请求是自己接收到的,返回成功响应给客户端。
// 必须顺序提交
long firstElementZxid = pendingTxns.element().zxid;
if (firstElementZxid != zxid) {
LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
+ " but next pending txn 0x" +
Long.toHexString(firstElementZxid));
ServiceUtils.requestSystemExit(ExitCode.UNMATCHED_TXN_COMMIT.getValue());
}
// 将准备提交的提案从pendingTxns队列移除
Request request = pendingTxns.remove();
request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);
// 最终调用FinalRequestProcessor.processRequest()提交提案,并如果最初的写请求是自己接收到的,返回成功响应给客户端
commitProcessor.commit(request);
这样,ZooKeeper 就完成了写请求的处理。你要特别注意一下,在分布式系统中,消息或者核心信息的持久化存储很关键,也很重要,因为这是保证集群能稳定运行的关键。
当然了,写入数据,最终还是为了后续的数据读取,那么在 ZooKeeper 中,是如何实现读操作的呢?

如何实现读操作?

相比写操作,读操作的处理要简单很多,因为接收到读请求的节点,只需要查询本地数据,然后响应数据给客户端就可以了。读操作的核心代码流程,如图所示。
咱们一起走一遍核心代码的流程,加深一下印象。
1. 跟随者在 FollowerRequestProcessor.processRequest() 中接收到读请求。
2. 跟随者在 FinalRequestProcessor.processRequest() 中查询本地数据,也就是 dataTree 中的数据。
// 处理读请求
case OpCode.getData: {
......
// 查询本地dataTree中的数据
rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo);
......
break;
}
3. 然后跟随者响应查询到数据给客户端。
case OpCode.getData : {
......
// 响应查询到的数据给客户端
cnxn.sendResponse(hdr, rsp, "response", path, stat, opCode);
break;
}
你看,这样 ZooKeeper 就完成读操作的处理。在这里,我想补充一点,你可以 dataTree 理解为 Raft 的状态机,提交的数据,最终存放在 dataTree 中。

内容小结

本节课我主要带你了解了 ZooKeeper 处理读写请求的过程,以及 ZooKeeper 的代码实现和核心流程。我希望你明确这样几个重点。
1. 与领导者“失联”的跟随者(比如发生分区故障时),是既不能处理写请求,也不能处理读请求的。
2. 在 ZooKeeper 中,写请求只能在领导者节点上处理,读请求可以在所有节点上处理,实现的是最终一致性。
因为本讲是 ZAB 协议的最后一讲,为了帮你后续学习本课程没有提到的内容,我想补充几点。
首先,ZAB 的术语众多,而且有些术语表达的是同一个含义,这些术语有些在文档中出现,有些在代码中出现。而你只有准确理解术语,才能更好地理解 ZAB 协议的原理,所以,我补充一些内容。
提案(Proposal):进行共识协商的基本单元,你可以理解为操作(Operation)或指令(Command),常出现在文档中。
事务(Transaction):也是指提案,常出现在代码中。比如,pRequest2Txn() 将接收到的请求转换为事务;再比如,未提交提案会持久化存储在事务日志中。在这里需要你注意的是,这个术语很容易引起误解,因为它不是指更广泛被接受的含义,具有 ACID 特性的操作序列。
其次,在我看来,Raft 算法和 ZAB 协议很类似,比如主备模式(也就是领导者、跟随者模型)、日志必须是连续的、以领导者的日志为准来实现日志一致等等。那为什么它们会比较类似呢?
我的看法是,“英雄所见略同”。比如 ZAB 协议要实现操作的顺序性,而 Raft 的设计目标,不仅仅是操作的顺序性,而是线性一致性,这两个目标,都决定了它们不能允许日志不连续,要按照顺序提交日志,那么,它们就要通过上面的方法实现日志的顺序性,并保证达成共识(也就是提交)后的日志不会再改变。
最后,我想就 ZAB 和 Raft 做个对比,来具体说说 ZAB 和 Raft 的异同。既然我们要做对比,那么首先要定义对比标准,我是这么考虑的:你应该有这样的体会,同一个功能,不同的同学实现的代码都会不一样(比如数据结构、代码逻辑),所以过于细节的比较,尤其是偏系统实现方面的,意义不大(比如跟随者是否转发写请求到领导者,不仅意义不大,而且这是 ZAB 和 Raft 都没有约定的,是集群系统需要考虑的),我们可以从核心原理上做对比。
领导者选举:ZAB 采用的“见贤思齐、相互推荐”的快速领导者选举(Fast Leader Election),Raft 采用的是“一张选票、先到先得”的自定义算法。在我看来,Raft 的领导者选举,需要通讯的消息数更少,选举也更快。
日志复制:Raft 和 ZAB 相同,都是以领导者的日志为准来实现日志一致,而且日志必须是连续的,也必须按照顺序提交。
读操作和一致性:ZAB 的设计目标是操作的顺序性,在 ZooKeeper 中默认实现的是最终一致性,读操作可以在任何节点上执行;而 Raft 的设计目标是强一致性(也就是线性一致性),所以 Raft 更灵活,Raft 系统既可以提供强一致性,也可以提供最终一致性。
写操作:Raft 和 ZAB 相同,写操作都必须在领导者节点上处理。
成员变更:Raft 和 ZAB 都支持成员变更,其中 ZAB 以动态配置(dynamic configuration)的方式实现的。那么当你在节点变更时,不需要重启机器,集群是一直运行的,服务也不会中断。
其他:相比 ZAB,Raft 的设计更为简洁,比如 Raft 没有引入类似 ZAB 的成员发现和数据同步阶段,而是当节点发起选举时,递增任期编号,在选举结束后,广播心跳,直接建立领导者关系,然后向各节点同步日志,来实现数据副本的一致性。在我看来,ZAB 的成员发现,可以和领导者选举合到一起,类似 Raft,在领导者选举结束后,直接建立领导者关系,而不是再引入一个新的阶段;数据同步阶段,是一个冗余的设计,可以去除的,因为 ZAB 不是必须要先实现数据副本的一致性,才可以处理写请求,而且这个设计是没有额外的意义和价值的。
另外,ZAB 和 ZooKeeper 强耦合,你无法在实际系统中独立使用;而 Raft 的实现(比如 Hashicorp Raft)是可以独立使用的,编程友好。

课堂思考

我提到 ZooKeeper 提供的是最终一致性,读操作可以在任何节点上执行。那么如果读操作访问的是备份节点,为什么无法保证每次都能读到最新的数据呢?欢迎在留言区分享你的看法,与我一同讨论。
最后,感谢你的阅读,如果这节课让你有所收获,也欢迎你将它分享给更多的朋友。
分享给需要的人,Ta购买本课程,你将得18
生成海报并分享

赞 8

提建议

上一篇
加餐 | ZAB协议(二):如何从故障中恢复?
下一篇
加餐 | MySQL XA是如何实现分布式事务的?
 写留言

精选留言(16)

  • zyz
    2020-05-21
    Zookeeper通过Leader来主导写操作,保证了顺序一致性。当一半以上的节点返回已写入,就返回客户端已写入,但是这时候只是部分节点写入,有的节点可能还没有同步上数据,所以读取备份节点可能不是最新的。同时Zookeeper的单一视图特征,保证客户端看到的数据不会比在之前服务器上所看到的更老。

    作者回复: 加一颗星:)

    共 5 条评论
    9
  • zyz
    2020-05-20
    老师!Zookeeper版本3.5.0开始支持dynamic configuration,成员变更的时候,不需要重启了吧

    作者回复: 加一颗星:),感谢反馈,采用dynamic configuration进行成员变更,不需要重启的,已修正。

    4
  • 王伟建
    2020-12-27
    老师有几个问题不太理解: 1.zk的写请求是领导者节点的两阶段提交,说写请求是强一致性,是说这个两阶段过程未完成之前不允许其他操作,所以说他是强一致性吗? 2.zk的读只能提供顺序一致性,也就是说他可能读到旧的版本的数据,那为什么还要把zk归为CP类型的系统呢?CAP里的C 不应该是强一致性吗,说到底感觉还是对这个一致性没理解透,就之前的理解来说,我认为C是指每次读取的数据都是最近一次写入的数据,而不是过期的数据。希望老师能讲解一下这块儿。 3.利用zk来实现分布式锁,多个服务同时去拿锁时,如果zk提供的读不是强一致性,那么会不会读到旧的锁信息?这块儿是怎么保证每个服务拿到的都是最新的数据,实现上来说是靠sync读吗?
    展开
    共 3 条评论
    3
  • 路人
    2020-09-03
    写我看用的是2pc,2pc中有些如果只有部分commit成功,zookeeper会怎么处理呢?是有什么补偿机制么?
    共 1 条评论
    2
  • 悟空聊架构
    2022-03-22
    课后题:我提到 ZooKeeper 提供的是最终一致性,读操作可以在任何节点上执行。那么如果读操作访问的是备份节点,为什么无法保证每次都能读到最新的数据呢? 回答:因为主节点发送 commit 消息给所有备份节点时,备份节点执行 commit 的时机不一定都是同步完成的,只有当 commit 之后,客户端读取的数据才是最新的,比如备份节点 B 先commit,客户端 1 连接的是 备份节点 B,那么客户端 1 肯定读到的是最新的,但是如果客户端连接的是备份节点 C,但是节点 C 还没有收到 commit 消息或者收到了,还没来得及 commit,客户端就发起请求了,这个时候读到的就是旧数据。但是过了短暂时间后,所有备份节点都 commit 了,这个时候任何客户端都可以读到最新的一致性数据了,这个就是最终一致性。 补充:这里 commit 操作就是将数据 放到 znode 内存数据结构上,这样客户端就可以读到最新的数据了。
    展开
  • 阿kai(aeo
    2022-01-13
    为什么觉得代码顺序是反的呢?比如下面这段FinalRequestProcessor怎么是首先创建的呢? RequestProcessor finalProcessor = new FinalRequestProcessor(this); // 创建finalProcessor,最终提交提案和响应查询 // 创建toBeAppliedProcessor,存储可提交的提案,并在提交提案后,从toBeApplied队列移除已提交的 RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader()); // 创建commitProcessor,处理提案提交或读请求 commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener()); commitProcessor.start(); // 创建proposalProcessor,按照顺序广播提案给跟随者 ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor); proposalProcessor.initialize(); // 创建prepRequestProcessor,根据请求创建提案 prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor); prepRequestProcessor.start(); // 创建firstProcessor,接收发给领导者的请求 firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
    展开
  • 姑射仙人
    2021-03-10
    老师,那么Zookeeper是CP呢,还是AP呢?读是最终一致,那么我理解是AP。但之前读的一篇文章,说Zookeeper不适合作为注册中心,说Zookeeper是CP,AP更适合作为注册中心,因为是读多写少,不要求马上能看到新的实例更新。 至于同步机制,Zookeeper也是支持增量更新,这样看来完全没有问题。麻烦老师指点下。
    共 3 条评论
  • 渔夫
    2021-02-16
    韩老师,想问下分布式集群模式下,FollowerZooKeeperServer.setupRequestProcessors方法是如何被调用的呢,找了好久找不到。不像standolone模式那么好找
  • Tesla
    2021-01-04
    老师,请问zk的最终一致性是以提升读的效率为目标的嘛,写的速率和强一致性的算法差不多吧?
    共 1 条评论
  • Heaven
    2020-08-20
    虽然领导者向着大部分的跟随者节点发送了commit请求,但是并不会等待跟随者响应完成写入再返回客户端,而是直接发送了一个完成消息给接收到客户端请求的节点,这就导致,客户端去读取的时候,可能读取到的节点是没有数据生效的节点,所以没法保证每次都读到最新的数据
  • Kvicii.Y
    2020-08-15
    Raft的强一致性是体现在读写都由领导者完成;ZAB的最终一致性是读写可以由领导者也可由Follower或Observer转发完成,如果ZAB想要实现强一致性,似乎也是可以通过sync API完成的吧
  • 爱德华
    2020-07-07
    老师,在说处理写请求的时候好像有点问题。本文中所说的是leader要在第二阶段(commit)后才会返回给客户端成功。但是在前几讲中,好像是说zab在第一阶段,收到大多数响应后就返回给客户端成功。那么这两个说法哪个正确呢?

    作者回复: 加一颗星:),第二个说法,在文中我没有搜到,如果方便,帮忙补充下哈。如果第二个说法去掉“第一阶段”和“就”,那么这两个说法,在我看来,都是正确的,一个侧重代码实现,一个侧重算法原理,就像同样是一棵树,生物学家和文学家的描述是不同的。

    共 2 条评论
  • Kvicii.Y
    2020-07-02
    领导者CommitProcessor.tryToCommit() 提交提案的方法似乎在3.6.0在Leader类中

    作者回复: 加一颗星:),是的

  • Dovelol
    2020-06-21
    老师好,想问下zab协议处理写请求要这么多步骤,那还能保证性能吗?如果其中某一小步骤延迟或阻塞都会影响写的性能把。

    作者回复: 加一颗星:),性能是相对,相比原子提交协议(比如2PC),共识算法的性能要高些。阻塞属于异常情况,在实际场景中,如果代码本身没有缺陷,在绝大多数时候,系统都是正常运行的,不存在阻塞的,也不会影响到写的性能。

    1
  • xzy
    2020-05-26
    你好,既然 zk 只能保证最终一致性,那么在分布式系统中,如 kafka、hbase 等,用 zk 做元数据管理岂不是有问题

    作者回复: 加一颗星:),绝大多数时候,业务对数据的新旧是没那么敏感的。

    共 2 条评论
    1
  • DY-杨
    2020-05-20
    老师,咨询下。paxos算法从您讲解下来好像仅是对某个提案达成共识。没有看到故障恢复或日志恢复的过程啊。反倒zab和raft有。那您知道用这种共识算法的软件是自研的日志恢复吗?另外联合共识算法是什么呢。

    作者回复: 加一颗星:),算法和工程实现之间存在差异,对于Paxos而言,差异就更大了,缺少工程实现的必须细节,这也是为什么大家现在主要选择了raft的原因。故障恢复,是ZAB定义的,Raft没有定义这个阶段。问题1:若使用paxos,需要自己权衡,可以使用它的提到的方法,新领导者通过prepare阶段来发现之前选定的值,也可以自己设计,需要整体考虑。问题2:联合共识,是Raft中处理成员变更的一种方法。

    1