极客时间已完结课程限时免费阅读

08 | 答疑解惑(一) : 网关如何接收服务端的秒杀结果?

08 | 答疑解惑(一) : 网关如何接收服务端的秒杀结果?-极客时间

08 | 答疑解惑(一) : 网关如何接收服务端的秒杀结果?

讲述:李玥

时长12:52大小11.77M

你好,我是李玥。
我们的“消息队列高手课”专栏自从上线到现在,同学们的学习热情和参与度都非常高。每一节课都有很多同学留言评论,这些留言里有总结知识分享收获的,有提出精彩问题的,还有给自己加油打气立 Flag 的,竟然还有说老师长得像黄渤的。我又仔细去看了一下配图,还是真挺像的。下次老师和极客时间的设计师小姐姐说一样,让她们照着吴彦祖来 P 图。
同学们每一条的留言我都认真看过,大部分留言我都给出了回复。在基础篇的最后一节课,我来统一解答一下大家都比较关注的一些问题。

1. 网关如何接收服务端的秒杀结果?

在《01 | 为什么需要消息队列?》这节课里面,我们举了一个秒杀的例子,这个例子是用来说明消息队列是如何来实现异步处理的。课后很多同学留言提问,网关在发送消息之后,是如何来接收后端服务的秒杀结果,又如何来给 APP 返回响应的呢?
在解答这个问题之前,我需要先说一下,实际生产环境中的秒杀系统,远比我们举的这个例子复杂得多,实现方案也是多种多样的,不是说一定要按照我们这个例子的方式来实现。
在这个例子中,网关接收后端服务秒杀结果,实现的方式也不只一种,这里我给大家提供一个比较简单的方案。
比如说,用 Java 语言来举例子:
public class RequestHandler {
// ID生成器
@Inject
private IdGenerator idGenerator;
// 消息队列生产者
@Inject
private Producer producer;
// 保存秒杀结果的Map
@Inject
private Map<Long, Result> results;
// 保存mutex的Map
private Map<Long, Object> mutexes = new ConcurrentHashMap<>();
// 这个网关实例的ID
@Inject
private long myId;
@Inject
private long timeout;
// 在这里处理APP的秒杀请求
public Response onRequest(Request request) {
// 获取一个进程内唯一的UUID作为请求id
Long uuid = idGenerator.next();
try {
Message msg = composeMsg(request, uuid, myId);
// 生成一个mutex,用于等待和通知
Object mutex = new Object();
mutexes.put(uuid, mutex)
// 发消息
producer.send(msg);
// 等待后端处理
synchronized(mutex) {
mutex.wait(timeout);
}
// 查询秒杀结果
Result result = results.remove(uuid);
// 检查秒杀结果并返回响应
if(null != result && result.success()){
return Response.success();
}
} catch (Throwable ignored) {}
finally {
mutexes.remove(uuid);
}
// 返回秒杀失败
return Response.fail();
}
// 在这里处理后端服务返回的秒杀结果
public void onResult(Result result) {
Object mutex = mutexes.get(result.uuid());
if(null != mutex) { // 如果查询不到,说明已经超时了,丢弃result即可。
// 登记秒杀结果
results.put(result.uuid(), result);
// 唤醒处理APP请求的线程
synchronized(mutex) {
mutex.notify();
}
}
}
}
在这个方案中,网关在收到 APP 的秒杀请求后,直接给消息队列发消息。至于消息的内容,并不一定是 APP 请求的 Request,只要包含足够的字段就行了,比如用户 ID、设备 ID、请求时间等等。另外,还需要包含这个请求的 ID 和网关的 ID,这些后面我们会用到。
如果发送消息失败,可以直接给 APP 返回秒杀失败结果,成功发送消息之后,线程就阻塞等待秒杀结果。这里面不可能无限等待下去,需要设定一个等待的超时时间。
等待结束之后,去存放秒杀结果的 Map 中查询是否有返回的秒杀结果,如果有就构建 Response,给 APP 返回秒杀结果,如果没有,按秒杀失败处理。
这是处理 APP 请求的线程,接下来我们来看一下,网关如何来接收从后端秒杀服务返回的秒杀结果。
我们可以选择用 RPC 的方式来返回秒杀结果,这里网关节点是 RPC 服务端,后端服务为客户端。之前网关发出去的消息中包含了网关的 ID,后端服务可以通过这个网关 ID 来找到对应的网关实例,秒杀结果中需要包含请求 ID,这个请求 ID 也是从消息中获取的。
网关收到后端服务的秒杀结果之后,用请求 ID 为 Key,把这个结果保存到秒杀结果的 Map 中,然后通知对应的处理 APP 请求的线程,结束等待。我刚刚说过,处理 APP 请求的线程,在结束等待之后,会去秒杀的结果 Map 中查询这个结果,然后再给 APP 返回响应。
我把这个处理过程的流程图放在这里,便于你理解:
这个解决方案还不是一个性能最优的方案,处理 APP 请求的线程需要同步等待秒杀结果。后面课程中我们会专门来讲,如何使用异步方式来提升程序的性能。

2. 详解 RocketMQ 和 Kafka 的消息模型

我在看《03 | 消息模型:主题和队列有什么区别?》这节课的留言时发现,不少同学对 RocketMQ 和 kafka 的消息模型理解的还不是很透彻,这两个消息队列产品的消息模型是一样的,我在这里,再把这个模型相关的概念,通过一个例子详细地说一说。
假设有一个主题 MyTopic,我们为主题创建 5 个队列,分布到 2 个 Broker 中。
先说消息生产这一端,假设我们有 3 个生产者实例:Produer0,Produer1 和 Producer2。
这 3 个生产者是如何对应到 2 个 Broker 的,又是如何对应到 5 个队列的呢?这个很简单,不用对应,随便发。每个生产者可以在 5 个队列中轮询发送,也可以随机选一个队列发送,或者只往某个队列发送,这些都可以。比如 Producer0 要发 5 条消息,可以都发到队列 Q0 里面,也可以 5 个队列每个队列发一条。
然后说消费端,很多同学没有搞清楚消费组、消费者和队列这几个概念的对应关系。
每个消费组就是一份订阅,它要消费主题 MyTopic 下,所有队列的全部消息。注意,队列里的消息并不是消费掉就没有了,这里的“消费”,只是去队列里面读了消息,并没有删除,消费完这条消息还是在队列里面。
多个消费组在消费同一个主题时,消费组之间是互不影响的。比如我们有 2 个消费组:G0 和 G1。G0 消费了哪些消息,G1 是不知道的,也不用知道。G0 消费过的消息,G1 还可以消费。即使 G0 积压了很多消息,对 G1 来说也没有任何影响。
然后我们再说消费组的内部,一个消费组中可以包含多个消费者的实例。比如说消费组 G1,包含了 2 个消费者 C0 和 C1,那这 2 个消费者又是怎么和主题 MyTopic 的 5 个队列对应的呢?
由于消费确认机制的限制,这里面有一个原则是,在同一个消费组里面,每个队列只能被一个消费者实例占用。至于如何分配,这里面有很多策略,我就不展开说了。总之保证每个队列分配一个消费者就行了。比如,我们可以让消费者 C0 消费 Q0,Q1 和 Q2,C1 消费 Q3 和 Q4,如果 C0 宕机了,会触发重新分配,这时候 C1 同时消费全部 5 个队列。
再强调一下,队列占用只是针对消费组内部来说的,对于其他的消费组来说是没有影响的。比如队列 Q2 被消费组 G1 的消费者 C1 占用了,对于消费组 G2 来说,是完全没有影响的,G2 也可以分配它的消费者来占用和消费队列 Q2。
最后说一下消费位置,每个消费组内部维护自己的一组消费位置,每个队列对应一个消费位置。消费位置在服务端保存,并且,消费位置和消费者是没有关系的。每个消费位置一般就是一个整数,记录这个消费组中,这个队列消费到哪个位置了,这个位置之前的消息都成功消费了,之后的消息都没有消费或者正在消费。
我把咱们这个例子的消费位置整理成下面的表格,便于你理解。
你可以看到,这个表格中并没有消费者这一列,也就是说消费者和消费位置是没有关系的。

3. 如何实现单个队列的并行消费?

下面说一下《03 | 消息模型:主题和队列有什么区别?》这节课的思考题:如果不要求严格顺序,如何实现单个队列的并行消费?关于这个问题,有很多的实现方式,在 JMQ(京东自研的消息队列产品)中,它实现的思路是这样的。
比如说,队列中当前有 10 条消息,对应的编号是 0-9,当前的消费位置是 5。同时来了三个消费者来拉消息,把编号为 5、6、7 的消息分别给三个消费者,每人一条。过了一段时间,三个消费成功的响应都回来了,这时候就可以把消费位置更新为 8 了,这样就实现并行消费。
这是理想的情况。还有可能编号为 6、7 的消息响应回来了,编号 5 的消息响应一直回不来,怎么办?这个位置 5 就是一个消息空洞。为了避免位置 5 把这个队列卡住,可以先把消费位置 5 这条消息,复制到一个特殊重试队列中,然后依然把消费位置更新为 8,继续消费。再有消费者来拉消息的时候,优先把重试队列中的那条消息给消费者就可以了。
这是并行消费的一种实现方式。需要注意的是,并行消费开销还是很大的,不应该作为一个常规的,提升消费并发的手段,如果消费慢需要增加消费者的并发数,还是需要扩容队列数。

4. 如何保证消息的严格顺序?

很多同学在留言中问,怎么来保证消息的严格顺序?我们多次提到过,主题层面是无法保证严格顺序的,只有在队列上才能保证消息的严格顺序。
如果说,你的业务必须要求全局严格顺序,就只能把消息队列数配置成 1,生产者和消费者也只能是一个实例,这样才能保证全局严格顺序。
大部分情况下,我们并不需要全局严格顺序,只要保证局部有序就可以满足要求了。比如,在传递账户流水记录的时候,只要保证每个账户的流水有序就可以了,不同账户之间的流水记录是不需要保证顺序的。
如果需要保证局部严格顺序,可以这样来实现。在发送端,我们使用账户 ID 作为 Key,采用一致性哈希算法计算出队列编号,指定队列来发送消息。一致性哈希算法可以保证,相同 Key 的消息总是发送到同一个队列上,这样可以保证相同 Key 的消息是严格有序的。如果不考虑队列扩容,也可以用队列数量取模的简单方法来计算队列编号。

写在最后

在留言中,很多同学留言提出来,能不能讲一讲某个消息队列的某个功能具体如何配置。我的建议是,你先不要太关注功能、API 和配置这些细节,在学习如何使用消息队列的过程中,要保持一定的高度来学习。
因为使用消息队列,大部分的难点在宏观架构层面,要解决这些难点,你需要掌握消息队列宏观层面上的实现原理和最佳实践,这样,无论你使用什么消息队列,都可以做到游刃有余。在选定了合适的消息队列产品,准备写代码之前,再去文档中查看这些细节都来得及。
所以,我们专栏的“基础篇”讲消息队列的使用,更多讲的是一些通用的原理。这节课是我们消息队列高手课“基础篇”的最后一节课,完整基础篇的学习后,意味着你已经是一个使用消息队列的小达人了。
在“进阶篇”中,我们将把学习重点从“如何使用”转为“如何实现”,在学习消息队列的实现技术时,你反而要专注到每一个技术点上,深入下去,把每个细节都要搞清楚、学透。课程的深度、难度也会逐步加强,当然你获得的经验值也会更多。
希望大家一如既往坚持学习,多思考,多练习,跟老师一起打怪升级,成为真正的高手。
感谢阅读,如果你觉得这篇文章对你有帮助的话,也欢迎把它分享给你的朋友。
分享给需要的人,Ta购买本课程,你将得18
生成海报并分享

赞 71

提建议

上一篇
07 | 消息积压了该如何处理?
下一篇
09 | 学习开源代码该如何入手?
 写留言

精选留言(97)

  • 大龄小学生
    置顶
    2019-08-08
    老师,一图胜千言,来点图吧。

    作者回复: 你要的图来了。我在文中补充了一个流程图,便于同学们理解。

    共 3 条评论
    42
  • linqw
    2019-08-08
    学习完网关答疑篇,写下自己的理解和疑惑,老师有空帮忙看下哦 1、秒杀的理解: APP--发送秒杀请求--》网关(也是RPC服务端,和配置中心保持长连接,比如nacos,将其路由和配置信息定时的发送给配置中心,配置中心对其进行管理,定时的清除宕机的网关路由信息,如超过一定时间没有接收到网关的心跳包)--》将其APP请求做一定的封装,增加网关id和网关实例中唯一的请求id发送给消息队列,为了保证消息不丢失,网关对其发送消息出现的异常进行处理,如超时异常,直接返回秒杀失败,网关发送消息的这个过程中可能涉及到分布式事务,使用消息队列的分布式事务进行处理,然后网关需要等待一段时间,等待秒杀服务端使用RPC调用网关实例的接收秒杀结果,为此创建一个新对象,将其请求id做为key,新对象做为value放入CurrentMap中,调用新对象的超时wait方法进行等待秒杀结果--发送封装的APP请求,包含网关id和请求id--》消息队列接收APP请求消息,为了保证消息不丢失,开启Sync_Flush参数将消息保存到磁盘,并且为了防止一台机器磁盘出问题,集群需要2台机器都有消息才确认请求--从消息队列中拉取消息--》秒杀服务端,为了低延迟执行风控、预占库存,拿到消息中网关id,从本地路由中查询网关id的实例信息,如果获取不到调用网关实例时,需先从配置中心获取到网关的路由信息,秒杀服务端也需和配置中心保持长连接,定时的从配置中心拉取网关的路由信息,保存到本地,使用RPC调用网关实例的接收秒杀结果的方法,为了保证消息不丢失,先执行消费逻辑,再响应消息队列,如果根据网关id获取不到网关实例,或者确认消息队列超时或出现异常,秒杀服务端回滚事务,此过程也涉及到分布式事务,为了防止消费重复消息,接口的幂等性,将请求id和网关id做为唯一键。也为了防止消息积压,消息队列中的主题队列和消费组中的消费者一一对应,保证消息被快速消费。 2、秒杀异步,APP发送请求给网关,网关接收请求后将请求做一定的封装(包括请求id,网关id,账户id),然后发送到消息队列中,响应APP请求,无需等待后需的流程,然后秒杀成功以否直接返回,后续流程处理完使用短信的形式告知用户是否秒杀成功,不知道这样做法是否可行。 3、最近在撸rocketmq的源码,搞了namesrv、logging、logappend模块,想成为commiter,立个flag,等后续JMQ出来,撸其源码,也想成为commiter,道阻且长,持续进化。
    展开

    作者回复: 我认真的看了同学的对于秒杀的理解,技术上都没什么问题。 从业务角度,老师有一些不同的看法。 对于秒杀这种场景,宏观上的设计应该是倾向于利用有限的资源处理短时间内海量的请求,保证服务不宕机。有少量请求处理出错(注意是后端错误,用户不可见)或消息丢失,是可以接受的。 毕竟秒杀拼的就是运气,某个用户秒杀请求在处理的时候丢失,和处理成功但没秒到,对于用户来说都是运气不好而已。 基于这样的设计理念,很多保证数据可靠性的做法都可以牺牲掉,用于换取系统更大的吞吐量比较划算。

    共 2 条评论
    67
  • 滴流乱转小胖儿
    2019-08-08
    mq界,吴彦祖老师你好,感谢分享

    作者回复: 谢谢你,蔡徐坤同学。

    共 8 条评论
    37
  • 摩云神雕
    2019-09-20
    请教下老师,topic的partition数 是根据什么确定的? 我理解partition的设置 是为了一个消费组中多个消费者并行消费的, 那么partition数根据什么设定呢? 根据消费者数 和 broker机器的性能吗 假设我现在的kafka有 3个broker节点, 创建了一个topic, partition值设为5, 然后我的一个消费组中有5个消费者, 正好一个消费者 消费一个partition, 后续, 这个消费组 我又想加2个消费者, 呢partition数 也调成7 是吧? 但是, 我这个topic 可能被好几个消费组消费的, 你消费组A扩展了消费者数, 想扩展partition数, 但是 我消费组B没这个需求啊, 怎么办? 这个topic是消费组A、B、C、D大家共同订阅的, 就因为 你A组加了几个消费者, 我就去改这个topic的partition数吗? 这里没太想通; 另外,在生产环境中 改了partition数,会造成什么影响?
    展开

    作者回复: 多个消费组的时候,确实有你说的问题,partition数量需要兼顾所有消费者。一般的做法都是照顾消费最慢的那个消费组,按照它的速度和消费者数量来确定partition数量。 大部分MQ都支持动态扩容,增加分区数量,分区数量变更后,会重新分配消费者和分区对应关系,对生产基本没什么影响。

    共 5 条评论
    27
  • 虢國技醬
    2019-09-05
    "同一个消费组内,每个队列只能被一个消费者实例占用" 我在想:如果队列只有两个Q1和Q2,但是G1中有三个消费者实例C1,C2和C3;那会不会有一个消费者实例一直在偷懒(没有干活的机会,😂)。 对于这种情况会报错?还是说内部会有机制队列的占用是换着分配给消费者实例的(不是某个实例一直占用)?

    作者回复: 是的,这时候就会有一个消费者没有干活的机会。 至于,如何分配队列与消费者的关系,不同的消息队列处理也不一样,有的消息队列是绑定队列与消费者,这样有一个消费者一直闲着,其它二个一直干活。 也有的消息队列是分时绑定,也就是你干一会儿,我干一会儿,但任何时刻,都会有一个消费者处于闲着的状态。

    共 2 条评论
    24
  • 木木木
    2019-08-20
    关于有序性还有疑惑,即使采用了一致性hash,无论扩容还是缩容队列,对分配相邻队列的用户部分还是有影响的,难道要等这些队列消费完了,阻止生产者发消息吗?感觉不具有可操作性

    作者回复: 扩容后只需要等一会儿,确保扩容之前的消息都消费完成了(不确定的话可以等久一点儿也没关系)再消费新分区的数据就可以了,生产不需要停。 因为一致性哈希可以保证单调性:如果已经有一些内容通过哈希分派到了相应的分区中,又有新的分区加入到系统中。哈希的结果应能够保证原有已分配的内容可以被映射到原有的或者新的分区中去,而不会被映射到旧的分区集合中的其他分区

    19
  • 大魔王汪汪
    2019-08-08
    看了几期,感觉分布式消息队列的设计方案和分布式存储系统的设计方案很类似,如果再加上事务处理,存储细节方案应该更像了

    作者回复: 同学那不是像,消息队列就是个分布式存储系统。

    共 2 条评论
    13
  • Liam
    2019-08-14
    秒杀这个案例中,超时之后不需要补偿机制吗,对于下游服务来说很可能以及成功了

    作者回复: 这个案例中,你说的这种情况是有可能存在的。 是否需要补偿,也无所谓对错,总体效果是一样的。秒杀的目的就是从众多秒杀用户中公平的选择n个用户,补偿或不补偿,影响的只是这n个用户是谁的问题。 所以这是一个架构选择的问题。 我建议是不用补偿,按失败处理,锁定的库存超时未支付后会自动释放,好处是比较简单。

    共 2 条评论
    11
  • asdf100
    2019-08-22
    消息一直在队列里不删除?

    作者回复: 消息什么时候删除取决于消息队列的配置,比如Kafka默认就是超过多长时间后就自动删除了。

    共 2 条评论
    10
  • 大白先生
    2019-08-09
    老师,在秒杀场景中,后端服务通过调用onResult方法来放入秒杀结果,有没有可能有一种情况,就是秒杀成功了,但是获取mutex是空,这样的活,会不会前段接收了秒杀失败的提示,但是库存扣减成功了

    作者回复: 会有这种情况。但无需特殊处理。 一般秒杀成功后,还有后续的支付流程。 有些用户秒杀成功也不会支付。你说的这种情况与“秒杀成功不支付”一并处理就行了。一般都是有一个支付超时,超过这个时限还没支付,就取消这个秒杀资格,自动释放库存。

    共 2 条评论
    10
  • 托尼斯威特
    2019-11-04
    刚开始学习 RocketMQ. Consumer可以register的MessageListener 有两种: MessageListenerOrderly 和 MessageListenerConcurrently. 这和老师讲的, RocketMQ 每个queue只要一个单线程消费者矛盾吗?

    作者回复: 首先强调一下,并不是“每个队列只要一个单线程消费者”,而是“每个队列只能被一个消费者实例占用。” rocketMQ的MessageListenerConcurrently,和我们上面讲的内容也不矛盾,它这个并行消费是完全在客户端实现的。大致的原理就是: 1. 客户端从服务端的某个队列读取一批消息; 2. 分发给客户端的多个线程消费; 3. 都消费成功后,给服务端返回消费成功确认。

    共 5 条评论
    10
  • 切糕
    2019-11-03
    我是看了rocketMQ源码后在看了一遍,受益匪浅
    9
  • haha
    2019-08-08
    七夕不过节,继续学习 —— 谁让我是单身狗呢
    共 2 条评论
    9
  • 海罗沃德
    2019-12-09
    说到一致性哈希算法,王争老师的算法课程里专门有一课讲一致性哈希,跟李玥老师的课互相印证,算法就落地了

    作者回复: 其实这些最基础的算法和数据结构的应用是非常广泛的,只是为了方便,被很多中间件或者底层系统给封装后,对一般的业务开发人员就不可见了。

    8
  • 猿人谷
    2019-08-08
    这篇答疑解惑,虽然简短,但绝对的诚意十足。希望多出这种答疑解惑的章节,毕竟评论区里很多留言的问题非常有代表性,对代表性的问题出这种答疑解惑的章节,学到的更多,也更能体现大家的参与度。
    8
  • skyun
    2019-08-08
    关于顺序消息还是有点疑问,考虑一下几个场景: 1、消息消费失败进入了重试队列; 2、增加或减少了消费者组里的消费者个数,导致consumer与queue之间的对应关系重新映射; 3、broker节点故障或者超时导致投递失败,业务上重新选择broker投递 这几种场景下,即使在投递时通过selector进行选择了queue,我想依旧会导致消息的乱序,请教下老师,这对在这些场景下,得分别采用怎么样的方式来保证消息的有序性。问题有点多,麻烦老师了
    展开

    作者回复: a1:这个只能关闭重试队列。 a2: consumer rebalance不会影响到消费顺序,因为对于单个队列来说,无论是不是rebalance,都是串行消费的。 a3:这个无需担心,也不用做任何处理,消息队列自身可以保证即使broker宕机发生切换,依然保证队列中消息的顺序不会变。

    共 4 条评论
    6
  • vi
    2019-08-21
    李sir,一个迟来的学生,刚刚看到这里,对于rabbitmq来说,没有消费组队列,只有exchange转送到相应的对列中,要想提高并发,看到的方法可以设置多线程消息,好像变成了单个对列并行消息的模式,会不会也有文中所说的这个问题,rabbitmq还可以通过设置prefetch来缓存一定的数目,是不是就相当于增加每个消费者的队列数来解决并发的问题了

    作者回复: 想要增加消费并发,可以考虑exchange把消息平均的分摊到多个消费队列中。

    共 2 条评论
    5
  • Geek_821c96
    2020-05-29
    老师,你好。 秒杀结果的map是不是可以替换成Redis更好些?

    作者回复: 这是个选择的问题,无所谓哪个更好。使用Redis持久化秒杀请求后,引入了复杂度和故障点,系统维护更困难了,但提升了请求的成功率,秒杀服务变为无状态服务代码逻辑也更简单。

    4
  • 大白先生
    2019-08-08
    对于这句话我不是很理解,还请老师帮忙解答。“队列中当前有 10 条消息,对应的编号是 0-9,当前的消费位置是 5。同时来了三个消费者来拉消息,把编号为 5、6、7 的消息分别给三个消费者,每人一条。”一个消费队列在一个消费组内不是只有一个消费者可以消费消息么,怎么能够同时来拉消息。如果您说的三个消费者是三个消费组的话,每个消费组维护自己的消费位置都是5,那么消费的都会是编号5的消息呀。

    作者回复: 我们不是在考虑如何来实现并行消费么?就不能再遵循现有的串行消费的限制了呀。

    共 3 条评论
    4
  • 张三丰
    2019-10-21
    如果有一台机器宕机了,而宕机前某个队列记录的是这个账户的流水账,宕机后这个账户的流水账可能会转移到其他的队列,这个时候怎么保证这个账户的流水账是有序的呢?

    作者回复: 关于如何保证严格顺序的问题,我们在这节课中也有讲到哦。

    3