极客时间已完结课程限时免费阅读

36 | Facebook游戏实时流处理Beam Pipeline实战(下)

36 | Facebook游戏实时流处理Beam Pipeline实战(下)-极客时间

36 | Facebook游戏实时流处理Beam Pipeline实战(下)

讲述:巴莫

时长08:54大小8.14M

你好,我是蔡元楠。
在上一讲中,我们一起对怎样实现一个简易的游戏积分排行榜展开了讨论,也一起研究了如何使用批处理计算的方式在 Beam 中构建出一个数据流水线来得出排行榜结果。
我们知道,虽然批处理计算可以得到一个完整的结果,但是它也存在着自身的不足,比如会有一定的延时,需要额外的 crontab 来管理定时任务,增加了维护成本等等。
所以在上一讲的末尾,我们提出了使用实时流处理来改进这些不足,而其中就需要用到窗口、触发器和累加模式这几个概念。
相信学习了第 32 讲的内容后,你对于窗口在 Beam 中是如何运作的,已经比较了解了。对于有效时间为一周的积分排行榜来说,我们可以赋予一个“窗口时长为一周的固定窗口”给数据流水线。也就是说,我们最终的结果会按照每一周的时长来得出。
那接下来的问题就剩下我们怎么定义触发器和累加模式了。
首先,我想先讲讲触发器在 Beam 中是怎么运作的。在第 23 讲中,我们已经了解了触发器在 Beam 中的作用。它是用于告诉数据流水线,什么时候需要计算一遍落在窗口中的所有数据的。这在实时流处理中尤为重要。
在实时流处理当中,我们总是需要在数据结果的完整性延迟性上做出一些取舍。
如果我们设置的触发器比较频繁,例如说每隔几分钟甚至是几秒钟,或者是在时间上很早发生的话,那就表示我们更倾向于数据流水线的延时比较小,但是不一定能够获得完整的数据。
如果我们设置的触发器是比较长时间的,像每隔一个小时才会触发一次窗口中的计算的话,那就表示我们更希望获得完整的数据集来得到最终结果。
为什么这么说呢?
因为在现实世界中,我们是没有办法保证数据流水线可以在某一刻能够得到在这一刻之前所产生的所有数据的。
就拿获得这个游戏积分排行榜的数据为例子来说明一下。
在现实生活中,可能很多用户是用手机来通关游戏,并且上传通关时间和积分的。有的地方可能因为信号差,上传数据会有很大的延迟。甚至可能有这样的情况:有一些用户是在坐飞机的时候玩的游戏(在飞行模式之下完成各种通关),等到飞机降落,手机重新有了信号之后,数据才被上传到服务器,这时候可能已经有了好几个小时的延时了。
如果提早触发窗口中的数据计算,可能会有很多“迟到”的数据未被纳入最终结果中,而这些“迟到”的数据有可能又会影响到游戏积分排行榜。
所以,在很多复杂的场景下,我们希望尽可能地将所有数据归入正确的时间窗口中,而且还要能够得到正确的结果。因此,除了要对触发器进行设置之外,我们还需要设置到底应不应该在一些“迟到”的数据来到的时候,重新计算一下整个结果。
在 Beam 中,我们可以为 PCollection 设置的触发器有 4 种模式:
1.基于事件时间的触发器(Event-Time Trigger)
如果设置了基于事件时间的触发器,那所有的计算都是基于 PCollection 中所有元素的事件时间的。
如果我们不显式地设置触发器的话,Beam 的默认触发器就是基于事件时间的,如果要显式地设置基于事件时间的触发器,可以使用 AfterWatermark 类进行设置。
2.基于处理时间触发器(Process-Time Trigger)
如果设置了基于处理时间的触发器,那一个 PCollection 中的所有元素都会在数据流水线中的某一个时刻被处理。如果要显式地设置基于处理时间的触发器,可以使 AfterProcessingTime 类进行设置。
3.数据驱动的触发器(Data-Driven Trigger)
数据驱动的触发器一般是在每个元素到达每个窗口时,通过检查这个元素是否满足某个属性来触发的。
就好像我在第 23 讲所举的例子一样,检查元素是否在窗口中到达一定的数量,然后触发计算就是数据驱动的触发器的一种,在 Beam 中,可以使用 AfterPane.elementCountAtLeast() 函数来配置。
4.复合触发器(Composite Trigger)
复合触发器其实就是由上面所说三种基本触发器组合而成的。在第 23 讲中,我举过一个触发器的例子,例子中至少要等到有两个交易数据到达后才能触发计算。
有同学在留言中问我,如果现实中只有一个数据到达窗口,那岂不是永远都触发不了计算了?其实,这个时候就可以定义一个复合触发器,可以定义成累积有超过两个元素落入窗口中或者是每隔一分钟触发一次计算的复合触发器。
而像我之前提到的,如果我们需要处理“迟到”的数据,那在 Beam 中又是怎么操作呢?我们可以使用 withAllowedLateness 这个在 Window 类里定义好的函数,方法签名如下:
Java
public Window<T> withAllowedLateness(Duration allowedLateness);
这个函数接收的参数就是我们希望允许多久的“迟到”数据可以被纳入计算中。
最后需要说明的是累加模式。
在 Beam 中,我们可以设置两种累加模式,分别是丢弃模式累积模式。它们可以分别通过 Window 类里的函数 discardingFiredPanes() 和 accumulatingFiredPanes() 来设置。
好了,那现在回到我们的积分排行榜问题当中。
虽然我们对输入数据集设定的窗口是一个窗口时长为 1 周的固定窗口,但是我们也需要尽可能地在近乎实时的状态下更新排行榜。所以,我们可以设置数据流水线在每 5 分钟更新一次。
那我们接受“迟到”多久的数据呢?
我在网上查询了一下,现在飞机航班直飞耗时最长的是新加坡飞往纽约的航班,大概需要 19 个小时。如果玩游戏的用户恰好也在这趟航班上,那么可能数据的延时可能就会超过 19 个小时了。那我们就设定允许“迟到”20 个小时的数据也纳入我们的窗口计算当中。
一般情况下,我们可以从 Pub/Sub 数据流中读取实时流数据。为了简化数据流水线的逻辑,不在数据流水线中保存中间状态,我们现在假设在实际操作的时候,服务器已经判断好某一用户的分数是否是最高分,如果是最高分的话,再通过 Pub/Sub 将数据传入流水线。
这时,我们的累加模式可以定义为丢弃模式,也就是只保留最新的结果。
为此,我们可以写出一个 Transform 来设置所有上述描述的概念,分别是:
设置窗口时长为 1 周的固定窗口。
每隔 5 分钟就会计算一次窗口内数据的结果。
允许“迟到”了 20 个小时的数据被重新纳入窗口中计算。
采用丢弃模式来保存最新的用户积分。
Java
static class ConfigUserScores extends PTransform<PCollection<UserScoreInfo>, PCollection<UserScoreInfo>> {
private final Duration FIXED_WINDOW_SIZE = Duration.standardDays(7);
private final Duration FIVE_MINUTES = Duration.standardMinutes(5);
private final Duration TWENTY_HOURS = Duration.standardHours(20);
@Override
public PCollection<UserScoreInfo> expand(PCollection<UserScoreInfo> infos) {
return infos.apply(
Window.<UserScoreInfo>into(FixedWindows.of(FIXED_WINDOW_SIZE))
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(FIVE_MINUTES))
.withLateFirings(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(FIVE_MINUTES)))
.withAllowedLateness(TWENTY_HOURS)
.discardingFiredPanes());
}
}
有了这个 Transform 去设定好我们在实时流处理中如何处理数据之后,我们其实只需要修改之前批处理数据流水线中很小的一部分,就可以达到我们想要的结果了。
Java
...
pipeline.apply(
KafkaIO.<String>read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("user_scores")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withLogAppendTime())
.apply("ConvertUserScoreInfo", ParDo.of(new ConvertUserScoreInfoFn()))
.apply("ConfigUserScores", new ConfigUserScores())
.apply("RetrieveTop100Players", new ExtractUserAndScore())
...
如代码所示,真正做出修改的地方就是将读取输入数据集的 BigTableIO 改成使用 KafkaIO 来读取。将批处理的两个 Filter Transform 替换成我们自定义的 ConfigUserScores Transform。
到此为止,我们就可以“一劳永逸”,运行一个实时流处理的数据流水线来得到游戏积分排行榜结果了。

小结

今天我们一起设计了一个实时流处理的数据流水线,来完成之前自定义的一个简单游戏积分排行榜。
这里面有不少经验是值得我们在处理现实的应用场景中借鉴的。比如,我们应该考虑数据结果的完整性有多重要、我们能接受多大的延迟、我们是否接受晚来的数据集等等。
这些问题其实又回到了第 23 讲中提到过的——我们在解决现实问题时,应该回答好的“WWWH”这四个问题。

思考题

今天我们一起探讨了如何利用实时流处理的方式来解决游戏积分排行榜的问题,里面涉及了配置窗口,触发器和累加模式。这些配置可能还不是最优的,你觉得我们还有什么地方可以进行优化的呢?
欢迎你把自己的学习体会写在留言区,与我和其他同学一起讨论。如果你觉得有所收获,也欢迎把文章分享给你的朋友。
分享给需要的人,Ta购买本课程,你将得18
生成海报并分享

赞 2

提建议

上一篇
35 | Facebook游戏实时流处理Beam Pipeline实战(上)
下一篇
37 | 5G时代,如何处理超大规模物联网数据
unpreview
 写留言

精选留言(8)

  • 微思
    2019-07-19
    老师讲的很好!要是能提供一个完整的案例,包括测试数据和运行时,不需要读者折腾太多,下载下来直接就能运行,相信会引起更多的共鸣,将这个专栏衬托得更加精彩!
    共 1 条评论
    6
  • 时光机器
    2020-01-12
    感觉这个例子没有提现出流式计算的实时特性呀。老师能举个像阿里实时战报看板这样高实时性要求的例子吗,感谢感谢
    3
  • 之渊
    2020-08-24
    参考https://blog.csdn.net/a799581229/article/details/106444576 这个博客入门以及触发器说明的也不错
    1
  • stephen
    2020-09-16
    如果窗口跟滑动步长一样或者更大,比如推荐点击率,20秒窗口,每20秒计算一次,也要允许延迟前一个窗口期甚至几个窗口期的延迟数据重新计算,这种beam能好的支持么?还是说只能借助外部hbase一类实时转储结果?
  • Geeker
    2020-03-07
    例子很好!

    作者回复: 谢谢

    1
  • Cool
    2019-09-26
    觉得这些例子逻辑上还是相对来说比较简单, 流式处理当输入源是多个的时候, 比如对于交易所来说 一个是实时 trade, 一个是实时的 price,都使用相同的 fix_window, join 起来之后,再做计算输出等等
    1
  • Cool
    2019-09-26
    蔡老师, 对于流处理需要对pipeline中的数据,进行数据补充时,可以使用 sideinput, 但是我看了官方文档,只能是静态的metadata,然后再Pardo中加到每一条数据, 并不能动态更新这个sideinput(比如在数据库中动态查询), 请问这种情况能怎么解决?
    共 1 条评论
  • 李孟
    2019-07-24
    老师我想问下, PCollection<String>这个种懒加载出来的集合怎么转存成临时的list集合?
    共 1 条评论