极客时间已完结课程限时免费阅读

10 | 生产者压缩算法面面观

10 | 生产者压缩算法面面观-极客时间

10 | 生产者压缩算法面面观

讲述:胡夕

时长12:17大小9.85M

你好,我是胡夕。今天我要和你分享的内容是:生产者压缩算法面面观。
说起压缩(compression),我相信你一定不会感到陌生。它秉承了用时间去换空间的经典 trade-off 思想,具体来说就是用 CPU 时间去换磁盘空间或网络 I/O 传输量,希望以较小的 CPU 开销带来更少的磁盘占用或更少的网络 I/O 传输。在 Kafka 中,压缩也是用来做这件事的。今天我就来跟你分享一下 Kafka 中压缩的那些事儿。

怎么压缩?

Kafka 是如何压缩消息的呢?要弄清楚这个问题,就要从 Kafka 的消息格式说起了。目前 Kafka 共有两大类消息格式,社区分别称之为 V1 版本和 V2 版本。V2 版本是 Kafka 0.11.0.0 中正式引入的。
不论是哪个版本,Kafka 的消息层次都分为两层:消息集合(message set)以及消息(message)。一个消息集合中包含若干条日志项(record item),而日志项才是真正封装消息的地方。Kafka 底层的消息日志由一系列消息集合日志项组成。Kafka 通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作。
那么社区引入 V2 版本的目的是什么呢?V2 版本主要是针对 V1 版本的一些弊端做了修正,和我们今天讨论的主题相关的修正有哪些呢?先介绍一个,就是把消息的公共部分抽取出来放到外层消息集合里面,这样就不用每条消息都保存这些信息了。
我来举个例子。原来在 V1 版本中,每条消息都需要执行 CRC 校验,但有些情况下消息的 CRC 值是会发生变化的。比如在 Broker 端可能会对消息时间戳字段进行更新,那么重新计算之后的 CRC 值也会相应更新;再比如 Broker 端在执行消息格式转换时(主要是为了兼容老版本客户端程序),也会带来 CRC 值的变化。鉴于这些情况,再对每条消息都执行 CRC 校验就有点没必要了,不仅浪费空间还耽误 CPU 时间,因此在 V2 版本中,消息的 CRC 校验工作就被移到了消息集合这一层。
V2 版本还有一个和压缩息息相关的改进,就是保存压缩消息的方法发生了变化。之前 V1 版本中保存压缩消息的方法是把多条消息进行压缩然后保存到外层消息的消息体字段中;而 V2 版本的做法是对整个消息集合进行压缩。显然后者应该比前者有更好的压缩效果。
我对两个版本分别做了一个简单的测试,结果显示,在相同条件下,不论是否启用压缩,V2 版本都比 V1 版本节省磁盘空间。当启用压缩时,这种节省空间的效果更加明显,就像下面这两张图展示的那样:

何时压缩?

在 Kafka 中,压缩可能发生在两个地方:生产者端和 Broker 端。
生产者程序中配置 compression.type 参数即表示启用指定类型的压缩算法。比如下面这段程序代码展示了如何构建一个开启 GZIP 的 Producer 对象:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启GZIP压缩
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);
这里比较关键的代码行是 props.put(“compression.type”, “gzip”),它表明该 Producer 的压缩算法使用的是 GZIP。这样 Producer 启动后生产的每个消息集合都是经 GZIP 压缩过的,故而能很好地节省网络传输带宽以及 Kafka Broker 端的磁盘占用。
在生产者端启用压缩是很自然的想法,那为什么我说在 Broker 端也可能进行压缩呢?其实大部分情况下 Broker 从 Producer 端接收到消息后仅仅是原封不动地保存而不会对其进行任何修改,但这里的“大部分情况”也是要满足一定条件的。有两种例外情况就可能让 Broker 重新压缩消息。
情况一:Broker 端指定了和 Producer 端不同的压缩算法。
先看一个例子。想象这样一个对话。
Producer 说:“我要使用 GZIP 进行压缩。”
Broker 说:“不好意思,我这边接收的消息必须使用 Snappy 算法进行压缩。”
你看,这种情况下 Broker 接收到 GZIP 压缩消息后,只能解压缩然后使用 Snappy 重新压缩一遍。如果你翻开 Kafka 官网,你会发现 Broker 端也有一个参数叫 compression.type,和上面那个例子中的同名。但是这个参数的默认值是 producer,这表示 Broker 端会“尊重”Producer 端使用的压缩算法。可一旦你在 Broker 端设置了不同的 compression.type 值,就一定要小心了,因为可能会发生预料之外的压缩 / 解压缩操作,通常表现为 Broker 端 CPU 使用率飙升。
情况二:Broker 端发生了消息格式转换。
所谓的消息格式转换主要是为了兼容老版本的消费者程序。还记得之前说过的 V1、V2 版本吧?在一个生产环境中,Kafka 集群中同时保存多种版本的消息格式非常常见。为了兼容老版本的格式,Broker 端会对新版本消息执行向老版本格式的转换。这个过程中会涉及消息的解压缩和重新压缩。一般情况下这种消息格式转换对性能是有很大影响的,除了这里的压缩之外,它还让 Kafka 丧失了引以为豪的 Zero Copy 特性。
所谓“Zero Copy”就是“零拷贝”,我在专栏第 6 期提到过,说的是当数据在磁盘和网络进行传输时避免昂贵的内核态数据拷贝,从而实现快速的数据传输。因此如果 Kafka 享受不到这个特性的话,性能必然有所损失,所以尽量保证消息格式的统一吧,这样不仅可以避免不必要的解压缩 / 重新压缩,对提升其他方面的性能也大有裨益。如果有兴趣你可以深入地了解下 Zero Copy 的原理。

何时解压缩?

有压缩必有解压缩!通常来说解压缩发生在消费者程序中,也就是说 Producer 发送压缩消息到 Broker 后,Broker 照单全收并原样保存起来。当 Consumer 程序请求这部分消息时,Broker 依然原样发送出去,当消息到达 Consumer 端后,由 Consumer 自行解压缩还原成之前的消息。
那么现在问题来了,Consumer 怎么知道这些消息是用何种压缩算法压缩的呢?其实答案就在消息中。Kafka 会将启用了哪种压缩算法封装进消息集合中,这样当 Consumer 读取到消息集合时,它自然就知道了这些消息使用的是哪种压缩算法。如果用一句话总结一下压缩和解压缩,那么我希望你记住这句话:Producer 端压缩、Broker 端保持、Consumer 端解压缩。
除了在 Consumer 端解压缩,Broker 端也会进行解压缩。注意了,这和前面提到消息格式转换时发生的解压缩是不同的场景。每个压缩过的消息集合在 Broker 端写入时都要发生解压缩操作,目的就是为了对消息执行各种验证。我们必须承认这种解压缩对 Broker 端性能是有一定影响的,特别是对 CPU 的使用率而言。
事实上,最近国内京东的小伙伴们刚刚向社区提出了一个 bugfix,建议去掉因为做消息校验而引入的解压缩。据他们称,去掉了解压缩之后,Broker 端的 CPU 使用率至少降低了 50%。不过有些遗憾的是,目前社区并未采纳这个建议,原因就是这种消息校验是非常重要的,不可盲目去之。毕竟先把事情做对是最重要的,在做对的基础上,再考虑把事情做好做快。针对这个使用场景,你也可以思考一下,是否有一个两全其美的方案,既能避免消息解压缩也能对消息执行校验。

各种压缩算法对比

那么我们来谈谈压缩算法。这可是重头戏!之前说了这么多,我们还是要比较一下各个压缩算法的优劣,这样我们才能有针对性地配置适合我们业务的压缩策略。
在 Kafka 2.1.0 版本之前,Kafka 支持 3 种压缩算法:GZIP、Snappy 和 LZ4。从 2.1.0 开始,Kafka 正式支持 Zstandard 算法(简写为 zstd)。它是 Facebook 开源的一个压缩算法,能够提供超高的压缩比(compression ratio)。
对了,看一个压缩算法的优劣,有两个重要的指标:一个指标是压缩比,原先占 100 份空间的东西经压缩之后变成了占 20 份空间,那么压缩比就是 5,显然压缩比越高越好;另一个指标就是压缩 / 解压缩吞吐量,比如每秒能压缩或解压缩多少 MB 的数据。同样地,吞吐量也是越高越好。
下面这张表是 Facebook Zstandard 官网提供的一份压缩算法 benchmark 比较结果:
从表中我们可以发现 zstd 算法有着最高的压缩比,而在吞吐量上的表现只能说中规中矩。反观 LZ4 算法,它在吞吐量方面则是毫无疑问的执牛耳者。当然对于表格中数据的权威性我不做过多解读,只想用它来说明一下当前各种压缩算法的大致表现。
在实际使用中,GZIP、Snappy、LZ4 甚至是 zstd 的表现各有千秋。但对于 Kafka 而言,它们的性能测试结果却出奇得一致,即在吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在压缩比方面,zstd > LZ4 > GZIP > Snappy。具体到物理资源,使用 Snappy 算法占用的网络带宽最多,zstd 最少,这是合理的,毕竟 zstd 就是要提供超高的压缩比;在 CPU 使用率方面,各个算法表现得差不多,只是在压缩时 Snappy 算法使用的 CPU 较多一些,而在解压缩时 GZIP 算法则可能使用更多的 CPU。

最佳实践

了解了这些算法对比,我们就能根据自身的实际情况有针对性地启用合适的压缩算法。
首先来说压缩。何时启用压缩是比较合适的时机呢?
你现在已经知道 Producer 端完成的压缩,那么启用压缩的一个条件就是 Producer 程序运行机器上的 CPU 资源要很充足。如果 Producer 运行机器本身 CPU 已经消耗殆尽了,那么启用消息压缩无疑是雪上加霜,只会适得其反。
除了 CPU 资源充足这一条件,如果你的环境中带宽资源有限,那么我也建议你开启压缩。事实上我见过的很多 Kafka 生产环境都遭遇过带宽被打满的情况。这年头,带宽可是比 CPU 和内存还要珍贵的稀缺资源,毕竟万兆网络还不是普通公司的标配,因此千兆网络中 Kafka 集群带宽资源耗尽这件事情就特别容易出现。如果你的客户端机器 CPU 资源有很多富余,我强烈建议你开启 zstd 压缩,这样能极大地节省网络资源消耗。
其次说说解压缩。其实也没什么可说的。一旦启用压缩,解压缩是不可避免的事情。这里只想强调一点:我们对不可抗拒的解压缩无能为力,但至少能规避掉那些意料之外的解压缩。就像我前面说的,因为要兼容老版本而引入的解压缩操作就属于这类。有条件的话尽量保证不要出现消息格式转换的情况。

小结

总结一下今天分享的内容:我们主要讨论了 Kafka 压缩的各个方面,包括 Kafka 是如何对消息进行压缩的、何时进行压缩及解压缩,还对比了目前 Kafka 支持的几个压缩算法,最后我给出了工程化的最佳实践。分享这么多内容,我就只有一个目的:就是希望你能根据自身的实际情况恰当地选择合适的 Kafka 压缩算法,以求实现最大的资源利用率。

开放讨论

最后给出一道作业题,请花时间思考一下:前面我们提到了 Broker 要对压缩消息集合执行解压缩操作,然后逐条对消息进行校验,有人提出了一个方案:把这种消息校验移到 Producer 端来做,Broker 直接读取校验结果即可,这样就可以避免在 Broker 端执行解压缩操作。你认同这种方案吗?
欢迎写下你的思考和答案,我们一起讨论。如果你觉得有所收获,也欢迎把文章分享给你的朋友。
分享给需要的人,Ta购买本课程,你将得20
生成海报并分享

赞 35

提建议

上一篇
09 | 生产者消息分区机制原理剖析
下一篇
11 | 无消息丢失配置怎么实现?
unpreview
 写留言

精选留言(93)

  • 胡夕
    置顶
    2019-06-26
    刚刚看到4天前京东提的那个jira已经修复了,看来规避了broker端为执行校验而做的解压缩操作,代码也merge进了2.4版本。有兴趣的同学可以看一下:https://issues.apache.org/jira/browse/KAFKA-8106
    共 5 条评论
    108
  • 常超
    2019-06-28
    文中对于消息结构的描述,确实引起了一些混乱,下面试图整理一下,希望对大家有帮助。 消息(v1叫message,v2叫record)是分批次(batch)读写的,batch是kafka读写(网络传输和文件读写)的基本单位,不同版本,对相同(或者叫相似)的概念,叫法不一样。 v1(kafka 0.11.0之前):message set, message v2(kafka 0.11.0以后):record batch,record 其中record batch对英语message set,record对应于message。 一个record batch(message set)可以包含多个record(message)。 对于每个版本的消息结构的细节,可以参考kafka官方文档的5.3 Message Format 章,里面对消息结构列得非常清楚。
    展开
    共 6 条评论
    113
  • Geek_8441fd
    2019-06-25
    broker端校验可以分两步走。 第1步,message set 层面,增加一个 crc,这样可以不用解压缩,直接校验压缩后的数据。 如果校验不成功,说明message set 中有损坏的message; 这时,再做解压操作,挨个校验message,找出损坏的那一个。 这样的话,绝大部分情况下,是不用做解压操作的;只有在确实发生错误时,才需要解压。 请指正。
    展开

    作者回复: 嗯嗯,挺好的。我自己也学到了一些。另外校验不仅仅是CRC校验,还有消息级别的检查。

    共 5 条评论
    54
  • 你好旅行者
    2019-06-25
    不行吧,校验操作应该也是为了防止在网络传输的过程中出现数据丢失的情况,在Producer端做完校验之后如果在传输的时候出现了错误,那这个校验就没有意义了。 我有一个问题想请教老师,如果每次传到Broker的消息都要做一次校验,那是不是都要把消息从内核态拷贝到用户态做校验?如果是这样的话那零拷贝机制不是就没有用武之地了?
    共 12 条评论
    46
  • Li Shunduo
    2019-06-25
    假如一个消息集合里有10条消息,并且被压缩,但是消费端配置每次只poll 5条消息。这种情况下,消费端怎么解压缩?矛盾点是 如果只取5条消息,需要broker帮助解压缩;如果取整个消息集合10条消息,会有贷款等资源的浪费?

    作者回复: 目前java consumer的设计是一次取出一批,缓存在客户端内存中,然后再过滤出max.poll.records条消息返给你,也不算太浪费吧,毕竟下次可以直接从缓存中取,不用再发请求了。

    共 5 条评论
    26
  • 风中花
    2019-06-26
    胡老师您好! 我们已经学历10多节课了! 针对我们得留言和反馈,不知道您有没有给我们一些后续得课程得学习建议和方法?我目前得学习就是您告诉我们得,我必须学会记住。但是看同学们得评论和反馈,我觉得貌似还有很多很多知识啊且不知也不懂,故有此一问!希望老师能给与一点一点学习建议? 感谢老师

    作者回复: 个人觉得学一个东西最重要的还是要用,如果只是参加一些培训课程很难全面的理解。您这么多的留言我一直坚持回复。我也一直是这个观点:用起来,自然问题就来了。 我学机器学习的经历和您现在学Kafka很像。没有实际使用场景怎么学都觉得深入不了。 我给您的建议是:把Kafka官网通读几遍然后再实现一个实时日志收集系统(比如把服务器日志实时放入Kafka)。事实上,能把官网全面理解的话已经比很多Kafka使用者要强了。

    共 2 条评论
    20
  • Hello world
    2019-06-25
    做一个笔记 怎么压缩: 1、新版本改进将每个消息公共部分取出放在外层消息集合,例如消息的 CRC 值 2、新老版本的保存压缩消息的方法变化,新版本是对整个消息集合进行压缩 何时压缩: 1、正常情况下都是producer压缩,节省带宽,磁盘存储 2、例外情况 a、broker端和producer端使用的压缩方法不同 b、broker与client交互,消息版本不同 何时解压缩: 1、consumer端解压缩 2、broker端解压缩,用来对消息执行验证 优化:选择适合自己的压缩算法,是更看重吞吐量还是压缩率。其次尽量server和client保持一致,这样不会损失kafka的zero copy优势
    展开
    共 1 条评论
    20
  • 衣申人
    2019-12-11
    老师,我看源码,broker在接收producer消息并落盘这块貌似没有用零拷贝啊!只有传输给consumer时用了,求解答

    作者回复: 是的,就是你理解的那样。Kafka使用Zero Copy优化将页缓存中的数据直接传输到Socket——这的确是发生在broker到consumer的链路上。这种优化能成行的前提是consumer端能够识别磁盘上的消息格式。

    共 3 条评论
    19
  • 星期八
    2019-07-12
    老师那再问一下,如果多条消息组成消息集合发送,那是什么条件控制消息发送,如果是一条又是什么条件控制触发发送的呢

    作者回复: 主要是这两个参数:batch.size和linger.ms。如果是生产了一条消息且linger.ms=0,通常producer就会立即发送者一条消息了。

    共 2 条评论
    15
  • cricket1981
    2019-06-25
    校验的目的是防止因为网络传输出现问题导致broker端接收了受损的消息,所以应该放在作为serverr broker端进行,而不是在作为client端的producer。改进的方案可以是针对一次传输的整个message set进行CRC检验,而不是针对一条条消息,这能够大大提高校验效率,因为避免了解压缩。
    15
  • 南辕北辙
    2019-06-27
    老师有一点不是很明白,在正常情况下broker端会原样保存起来,但是为了检验需要解压缩。该怎么去理解这个过程呢,broker端解压缩以后还会压缩还原吗? 这个过程是在用户态执行的吗,总感觉怪怪的

    作者回复: 它只是解压缩读取而已,不会将解压缩之后的数据回写到磁盘。另外就像我置顶的留言那样,目前社区已经接纳了京东小伙伴的修改,貌似可以绕过这部分解压缩了,。

    共 3 条评论
    12
  • dream
    2019-06-25
    老师,我对消息层次、消息集合、消息、日志项这些概念与它们之间的关系感觉很懵, 消息层次都分消息集合以及消息,消息集合中包含日志项,日志项中封装消息, 那么日志项中封装的是producer发送的消息吗? 一个日志项中会包含多条消息吗? 消息集合中消息项封装的的消息与消息层次包含的消息有什么关系呢? 这两个消息与producer发送的消息有什么关系呢? 一个消息集合对应是producer发送的一条消息还是多条消息呢? 最后,老师能不能详细说一下CRC校验,谢谢!
    展开

    作者回复: 消息批次RecordBatch里面包含若干条消息(record)。 你可以认为消息批次和消息集合是等价的,消息和日志项是等价的。 这样消息层次有两层:外层是消息批次(或消息集合);里层是消息(或日志项)。 Producer以recordbatch为单位发送消息,对于V2版本一个batch中通常包含多条消息。 在V2版本中,在batch层面计算CRC值;在V1版本中,每条消息都要计算CRC值。

    共 6 条评论
    12
  • dream
    2019-06-25
    老师,对于消息层次、消息集合、消息这三者的概念与关系我有点懵,能不能详细说一下?谢谢!
    共 2 条评论
    11
  • 曾轼麟
    2019-06-25
    不认同,因为网络传输也会造成丢失,但是我建议可以在消息里面使用一种消耗较小的签名方式sign,比如多使用位移等方式,broke端也这么操纵,如果签名不一致证明有数据丢失,同时签名的方式可以避免CPU大量消耗

    作者回复: 有一定道理。有机会我向社区反馈下:)

    共 3 条评论
    10
  • 南辕北辙
    2019-07-01
    老师有一点有点迷惑,broker为了多版本消息兼容,意思是一份消息有多个版本存在吗,是这个意思吗?

    作者回复: 同一台broker上可能存在多个版本的消息,但每条消息只会以1个版本的形式保存。

    共 3 条评论
    8
  • 2019-06-25
    我看了三遍老师的课,得到了我要的答案: 1.如果生产者使用了压缩,broker为了crc校验,会启动解压,这个解压过程不可避免; 2.v2的broker为了低版本的消费者,会把消息再次解压并进行协议转换。 所以消费者的兼容成本较大,需要避免这个情况。
    共 2 条评论
    8
  • 代码小生
    2019-09-18
    原来在 V1 版本中,每条消息都需要执行 CRC 校验,但是CRC在某些情况下会变化,所以crc拿到消息集和中更好,这个逻辑我没有明白呢,既然CRC会变,为了消息的正确性不更应该每条消息都校验吗?为什么说拿到消息集和中一次校验更好呢?

    作者回复: V2依然是做CRC校验的,只不过是在record batch这个层级上做,而不是一条一条消息地做了。如果CRC校验失败,重传batch。也就是说不会以消息作为传输单位进行校验,这样效率太低

    7
  • What for
    2019-08-08
    老师您好,您的课程很棒,又很实用又有原理性的分析! 我想问一个问题,Producer 发送数据时以批次为单位,那么 batch 与 broker 端的消息集合又是怎么样的对应关系呢?每个消息集合的 record 数量是否固定呢? 就是说在 Producer 端即使消息并没有达到 batch.size 的数量,linger.ms 也可以让它发送一批数据,那 broker 在低峰期的时候收到一批数据之后是会写入缓存等凑够一定数量组成一个消息集合还是说会立即(或设置超时时间)组成一个消息集合写入磁盘? 谢谢!
    展开

    作者回复: 不是固定数量。 “在 Producer 端即使消息并没有达到 batch.size 的数量,linger.ms 也可以让它发送一批数据” --- 是的 取决于linger.ms的值

    共 4 条评论
    6
  • juan
    2019-07-09
    如果在配置中不指定压缩算法,kafka有默认的压缩算法吗?

    作者回复: 没有

    共 2 条评论
    5
  • 莫问流年
    2019-06-25
    我觉得消息检验放在producer端是不合理的。首先,检验应该是消息接收方broker的责任,每个发送方不应该承担这种公共的检验工作,也不利于扩展。其次,发送方producer检验影响了producer的性能,而且并不能保证消息到达broker后依然正确。 另外,想请教下老师,broker对消息进行的检验一般有哪些?

    作者回复: 比如compact类型的topic要检查key是否存在等。有个好消息是我看最新的2.4已经接收了京东同学提的bug而且也修正了。尚不知道京东方面是怎么解决的,有兴趣的话可以看一下:https://issues.apache.org/jira/browse/KAFKA-8106

    5