极客时间已完结课程限时免费阅读

12 | 客户端都有哪些不常见但是很高级的功能?

12 | 客户端都有哪些不常见但是很高级的功能?-极客时间

12 | 客户端都有哪些不常见但是很高级的功能?

讲述:胡夕

时长11:03大小8.86M

你好,我是胡夕。今天我要和你分享的主题是:客户端都有哪些不常见但是很高级的功能。
既然是不常见,那就说明在实际场景中并没有太高的出场率,但它们依然是很高级很实用的。下面就有请今天的主角登场:Kafka 拦截器。

什么是拦截器?

如果你用过 Spring Interceptor 或是 Apache Flume,那么应该不会对拦截器这个概念感到陌生,其基本思想就是允许应用程序在不修改逻辑的情况下,动态地实现一组可插拔的事件处理逻辑链。它能够在主业务操作的前后多个时间点上插入对应的“拦截”逻辑。下面这张图展示了 Spring MVC 拦截器的工作原理:
拦截器 1 和拦截器 2 分别在请求发送之前、发送之后以及完成之后三个地方插入了对应的处理逻辑。而 Flume 中的拦截器也是同理,它们插入的逻辑可以是修改待发送的消息,也可以是创建新的消息,甚至是丢弃消息。这些功能都是以配置拦截器类的方式动态插入到应用程序中的,故可以快速地切换不同的拦截器而不影响主程序逻辑。
Kafka 拦截器借鉴了这样的设计思路。你可以在消息处理的前后多个时点动态植入不同的处理逻辑,比如在消息发送前或者在消息被消费后。
作为一个非常小众的功能,Kafka 拦截器自 0.10.0.0 版本被引入后并未得到太多的实际应用,我也从未在任何 Kafka 技术峰会上看到有公司分享其使用拦截器的成功案例。但即便如此,在自己的 Kafka 工具箱中放入这么一个有用的东西依然是值得的。今天我们就让它来发挥威力,展示一些非常酷炫的功能。

Kafka 拦截器

Kafka 拦截器分为生产者拦截器和消费者拦截器。生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑;而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。值得一提的是,这两种拦截器都支持链的方式,即你可以将一组拦截器串连成一个大的拦截器,Kafka 会按照添加顺序依次执行拦截器逻辑。
举个例子,假设你想在生产消息前执行两个“前置动作”:第一个是为消息增加一个头信息,封装发送该消息的时间,第二个是更新发送消息数字段,那么当你将这两个拦截器串联在一起统一指定给 Producer 后,Producer 会按顺序执行上面的动作,然后再发送消息。
当前 Kafka 拦截器的设置方法是通过参数配置完成的。生产者和消费者两端有一个相同的参数,名字叫 interceptor.classes,它指定的是一组类的列表,每个类就是特定逻辑的拦截器实现类。拿上面的例子来说,假设第一个拦截器的完整类路径是 com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor,第二个类是 com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor,那么你需要按照以下方法在 Producer 端指定拦截器:
Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
……
现在问题来了,我们应该怎么编写 AddTimeStampInterceptor 和 UpdateCounterInterceptor 类呢?其实很简单,这两个类以及你自己编写的所有 Producer 端拦截器实现类都要继承 org.apache.kafka.clients.producer.ProducerInterceptor 接口。该接口是 Kafka 提供的,里面有两个核心的方法。
onSend:该方法会在消息发送之前被调用。如果你想在发送之前对消息“美美容”,这个方法是你唯一的机会。
onAcknowledgement:该方法会在消息成功提交或发送失败之后被调用。还记得我在上一期中提到的发送回调通知 callback 吗?onAcknowledgement 的调用要早于 callback 的调用。值得注意的是,这个方法和 onSend 不是在同一个线程中被调用的,因此如果你在这两个方法中调用了某个共享可变对象,一定要保证线程安全哦。还有一点很重要,这个方法处在 Producer 发送的主路径中,所以最好别放一些太重的逻辑进去,否则你会发现你的 Producer TPS 直线下降。
同理,指定消费者拦截器也是同样的方法,只是具体的实现类要实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口,这里面也有两个核心方法。
onConsume:该方法在消息返回给 Consumer 程序之前调用。也就是说在开始正式处理消息之前,拦截器会先拦一道,搞一些事情,之后再返回给你。
onCommit:Consumer 在提交位移之后调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。
一定要注意的是,指定拦截器类时要指定它们的全限定名,即 full qualified name。通俗点说就是要把完整包名也加上,不要只有一个类名在那里,并且还要保证你的 Producer 程序能够正确加载你的拦截器类。

典型使用场景

Kafka 拦截器都能用在哪些地方呢?其实,跟很多拦截器的用法相同,Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景
我以端到端系统性能检测和消息审计为例来展开介绍下。
今天 Kafka 默认提供的监控指标都是针对单个客户端或 Broker 的,你很难从具体的消息维度去追踪集群间消息的流转路径。同时,如何监控一条消息从生产到最后消费的端到端延时也是很多 Kafka 用户迫切需要解决的问题。
从技术上来说,我们可以在客户端程序中增加这样的统计逻辑,但是对于那些将 Kafka 作为企业级基础架构的公司来说,在应用代码中编写统一的监控逻辑其实是很难的,毕竟这东西非常灵活,不太可能提前确定好所有的计算逻辑。另外,将监控逻辑与主业务逻辑耦合也是软件工程中不提倡的做法。
现在,通过实现拦截器的逻辑以及可插拔的机制,我们能够快速地观测、验证以及监控集群间的客户端性能指标,特别是能够从具体的消息层面上去收集这些数据。这就是 Kafka 拦截器的一个非常典型的使用场景。
我们再来看看消息审计(message audit)的场景。设想你的公司把 Kafka 作为一个私有云消息引擎平台向全公司提供服务,这必然要涉及多租户以及消息审计的功能。
作为私有云的 PaaS 提供方,你肯定要能够随时查看每条消息是哪个业务方在什么时间发布的,之后又被哪些业务方在什么时刻消费。一个可行的做法就是你编写一个拦截器类,实现相应的消息审计逻辑,然后强行规定所有接入你的 Kafka 服务的客户端程序必须设置该拦截器。

案例分享

下面我以一个具体的案例来说明一下拦截器的使用。在这个案例中,我们通过编写拦截器类来统计消息端到端处理的延时,非常实用,我建议你可以直接移植到你自己的生产环境中。
我曾经给一个公司做 Kafka 培训,在培训过程中,那个公司的人提出了一个诉求。他们的场景很简单,某个业务只有一个 Producer 和一个 Consumer,他们想知道该业务消息从被生产出来到最后被消费的平均总时长是多少,但是目前 Kafka 并没有提供这种端到端的延时统计。
学习了拦截器之后,我们现在知道可以用拦截器来满足这个需求。既然是要计算总延时,那么一定要有个公共的地方来保存它,并且这个公共的地方还是要让生产者和消费者程序都能访问的。在这个例子中,我们假设数据被保存在 Redis 中。
Okay,这个需求显然要实现生产者拦截器,也要实现消费者拦截器。我们先来实现前者:
public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {
private Jedis jedis; // 省略Jedis初始化
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
jedis.incr("totalSentMessage");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<java.lang.String, ?> configs) {
}
上面的代码比较关键的是在发送消息前更新总的已发送消息数。为了节省时间,我没有考虑发送失败的情况,因为发送失败可能导致总发送数不准确。不过好在处理思路是相同的,你可以有针对性地调整下代码逻辑。
下面是消费者端的拦截器实现,代码如下:
public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private Jedis jedis; //省略Jedis初始化
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
long lantency = 0L;
for (ConsumerRecord<String, String> record : records) {
lantency += (System.currentTimeMillis() - record.timestamp());
}
jedis.incrBy("totalLatency", lantency);
long totalLatency = Long.parseLong(jedis.get("totalLatency"));
long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
在上面的消费者拦截器中,我们在真正消费一批消息前首先更新了它们的总延时,方法就是用当前的时钟时间减去封装在消息中的创建时间,然后累计得到这批消息总的端到端处理延时并更新到 Redis 中。之后的逻辑就很简单了,我们分别从 Redis 中读取更新过的总延时和总消息数,两者相除即得到端到端消息的平均处理延时。
创建好生产者和消费者拦截器后,我们按照上面指定的方法分别将它们配置到各自的 Producer 和 Consumer 程序中,这样就能计算消息从 Producer 端到 Consumer 端平均的处理延时了。这种端到端的指标监控能够从全局角度俯察和审视业务运行情况,及时查看业务是否满足端到端的 SLA 目标。

小结

今天我们花了一些时间讨论 Kafka 提供的冷门功能:拦截器。如之前所说,拦截器的出场率极低,以至于我从未看到过国内大厂实际应用 Kafka 拦截器的报道。但冷门不代表没用。事实上,我们可以利用拦截器满足实际的需求,比如端到端系统性能检测、消息审计等。
从这一期开始,我们将逐渐接触到更多的实际代码。看完了今天的分享,我希望你能够亲自动手编写一些代码,去实现一个拦截器,体会一下 Kafka 拦截器的功能。要知道,“纸上得来终觉浅,绝知此事要躬行”。也许你在敲代码的同时,就会想到一个使用拦截器的绝妙点子,让我们拭目以待吧。

开放讨论

思考这样一个问题:Producer 拦截器 onSend 方法的签名如下:
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record)
如果我实现的逻辑仅仅是 return null,你觉得 Kafka 会丢弃该消息,还是原封不动地发送消息?请动手试验一下,看看结果是否符合你的预期。
欢迎写下你的思考和答案,我们一起讨论。如果你觉得有所收获,也欢迎把文章分享给你的朋友。
分享给需要的人,Ta购买本课程,你将得20
生成海报并分享

赞 20

提建议

上一篇
11 | 无消息丢失配置怎么实现?
下一篇
13 | Java生产者是如何管理TCP连接的?
unpreview
 写留言

精选留言(51)

  • 风中花
    2019-07-04
    胡老师您好! 我公司现在就我一个人懂点kafka,但是公司线下却有使用kafka,现在知道我学习这个就交给我了,我现在遇到一个线上问题:消息经常堆积起来,不能消费了,重启服务就能继续消费了。我目前得能力还搞不定,望老师能给指点一二 。谢谢。谢谢

    作者回复: 消息堆积可能原因如下:1. 生产速度大于消费速度,这样可以适当增加分区,增加consumer数量,提升消费TPS;2. consumer消费性能低,查一下是否有很重的消费逻辑(比如拿到消息后写HDFS或HBASE这种逻辑就挺重的),看看是否可以优化consumer TPS;3. 确保consumer端没有因为异常而导致消费hang住; 4. 如果你使用的是消费者组,确保没有频繁地发生rebalance 主要排查下可能是哪些原因

    共 17 条评论
    79
  • 王槐铤
    2019-06-30
    onSend传null会在KafkaProducer类中调用doSend时引发NPE,并通过 ProducerInterceptors.onSendError 方法传导至onAcknowledgement,以及throw到用户编写的Producer中。
    21
  • 落霞与孤鹜
    2019-06-30
    这个问题和这期的内容没关系😓。 如果一个主题,由一个应用的名为A的消费组消费,然后把消费组名改为B,重新发布应用,这个时候是不是从主题的分区头开始消费?如何保证从上次A消费组的最新偏移量处开始消费?

    作者回复: 我假设你指的名字是group.id。那么把A改成B对于Kafka而言就是新的consumer。新consumer从头还是从最新开始消费取决于auto.offset.reset的设置

    共 3 条评论
    20
  • 李先生
    2020-04-10
    胡哥: consumer消费:比如异步发积分,发积分的消息进入kafka,加积分服务监听kafka的topic,为了避免重复消费,加积分服务获取到消息后先写入mysql,并利用mysql的唯一索引的能力来避免重复消费,然后加积分服务异步去执行mysql中的信息去实现加积分。这种实现方案会导致消费性能低下,但是写入mysql一是避免重复消费,二是做一条成功的记录(便于后期查询)。这种如何优化呢
    展开

    作者回复: 如果只是这样使用,我倒是不建议用MySQL来做去重,你还不如在应用层面自行去重。一定要用的话,不妨试试把MySQL表为topic key做分区表吧

    12
  • 张庆
    2019-07-02
    return null ; 报错了,NullPointException错误,KafkaProducer doSend方法里面获取消息主题和分区。
    共 1 条评论
    11
  • Lei@时速云
    2019-06-29
    👍 胡总出专栏了

    作者回复: 磊总别闹:)

    8
  • 打码的土豆
    2019-07-01
    老师你好 最近在看kafka官方文档时有一处没看懂还望老师指教 All data is immediately written to a persistent log on the filesystem without necessarily flushing to disk. In effect this just means that it is transferred into the kernel's pagecache 这上面说的文件系统上的持久日志为什么会是pagecache pagecache不是内存的一部分吗

    作者回复: 它这句话的意思是不需要用户手动调用flush来刷盘,由os自己来做:)

    共 2 条评论
    7
  • IT小僧
    2020-01-13
    老师好!这个消费者段onConsume是在消费数据之前执行的方法,并不能真正统计实际处理该消费耗费时间的吧。我觉得理应放在onCommit里面。

    作者回复: 是的,你是对的~ 其实放在哪个方法里面取决于你的计时逻辑:)

    6
  • 小马
    2020-01-09
    老师有两个疑问请教下: 1、时间的一致性问题。System.currentTimeMillis() - record.timestamp()发送和接收的时间实际上可能来自两台机器,有可能时间不一致,会导致统计结果偏差很大; 2、消费端代码计算时间差是在循环里面进行的,把System.currentTimeMillis()提到循环外面应该会好一点吧,毕竟这一批消息应该算是同时接收的; 3、消息总数是在生产端统计的,时延是在消费端统计的,但是如果在消息传输过程中出现部分消息丢失是不是会影响统计的准确性。
    展开

    作者回复: 1. 确实有这个问题。用户自己来规避之 2. 同意~ 3. 我们还是能保证消息不丢失的吧:)

    共 4 条评论
    6
  • 2019-08-14
    胡老师好,请教两个小问题 1:broker通过网络拿到消息后,落盘的规则是什么?来一条消息就落盘一条?还是积攒够一定的量再落盘? 2:从追主,新的消息是主主动推给各个从的?还是从主动拉取的?如果是主动拉取,从怎么知道主有新的消息的?另外,同步的时候是只要有一条新消息就同步嘛? 如果其他同学清楚,也请帮忙解答一下,多谢。
    展开

    作者回复: 1. 实际上这个交由OS来决定,Kafka不管 2. 从拉取的方式。Kafka定义了水位的概念来标识拉取的进度

    共 3 条评论
    6
  • 周曙光爱学习
    2020-04-09
    拦截器的确是很有用,我们在grpc的拦截器中做限流处理。同理,由于下游存储TPS能力有限,也完全可以在kafka消费的拦截器中做消费限流处理,防止把存储打挂

    作者回复: 嗯嗯,算是一个不太常见功能的典型应用了:)

    4
  • 进击的姬哥
    2019-11-23
    Interceptor处理数据是单条的吗,还是多条数据作为一个集合

    作者回复: 单条消息

    4
  • James
    2019-11-13
    请问老师无法完成提交,是因为重新平衡,是什么原因才会导致. 刚接触不久,就要修改线上环境问题.但是一直跑了一小时就会下面问题,然后oom Group coordinator rl-hadoop5.gci.com:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery Offset commit failed on partition dispatch_bus-1 at offset 28978632: The coordinator is not aware of this member. Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
    展开

    作者回复: 我的经验是先解决OOM的问题吧。至于commit 失败,通常是因为事件处理的速度太慢了

    共 7 条评论
    4
  • Roy Liang
    2020-04-14
    老师你好,onAcknowledgement不应该是消息ACK后吗?怎么是提交成功后呢?提交成功后感觉应该属于callback的

    作者回复: hmmm.... 我的意思是消息被broker端应答就表示消息被提交成功了。

    3
  • Liam
    2019-06-29
    请问下老师。onsend或onconsumer的执行线程和发送或消费消息的线程是否是同一个线程?加入太多耗时逻辑是否会影响主逻辑?

    作者回复: onsend是producer进程下的线程;onConsume是consumer进程下的线程,它们不是一个进程。我说的是onSend和onAcknowledgement是一个进程下的多个线程。

    3
  • 振超
    2019-06-29
    Producer 发送的是被 ProducerInterceptor 修改后的消息,返回 null 也是一种修改的行为,所以 kafka 不应该对这种情况特殊对待。不过将 null 发送到服务端没有意义,实际执行会出现 NPE,不过异常最终会被捕获传递给 ProducerInterceptor 的 onAcknowledgement 方法。
    2
  • 小马哥
    2021-05-18
    结果: KafkaProducer.doSend(KafkaProducer.java:876)抛出java.lang.NullPointerException异常 原理解释: 与Javaweb的拦截器一样, 被自定义的拦截器拦截之后, 在生产者doSend之前没有将消息放行, 而是放行了null, 生产者doSend(null)抛异常
    1
  • 双椒叔叔
    2020-07-13
    胡老师,我看了源码后发现 1038,1037,1029-----reassign----->1038,1037,1048 迁移计划卡住的那步是这样的 首先1029副本的状态从replica中移除流程是 controller先把1029offline, 然后 controller发送状态改变请求给1029 1、first move the replica to offline state (the controller removes it from the ISR) 2、send stop replica command to the old replicas 3、Eventually partition reassignment could use a callback that does retries if deletion failed 然后返回一个回调状态值NonExistentReplica(因为1029现在是死了的状态)。 那么这种情况如何解决呢? 是直接在zk中修改该主题的问题分区(执行迁移计划卡主的那个分区)吗?
    展开
    共 1 条评论
    2
  • 双椒叔叔
    2020-07-13
    分区迁移遇到的问题怎么解决呢 1038,1037,1029-----reassign----->1038,1037,1048 迁移计划卡死了 replica变成1038,1037,1048,1029了,ISR变成了1038,1048 其实就是1029机子先宕掉了,我想要把死掉的1029机子上的副本迁移到1048上 现在卡死了,一直in progress 我想要在replica中remove1029,但是我看了源码发现是状态机维护每一个replica,自己没办法删除,所以现在没辙了。。。
    展开
    1
  • 飞猪环游记
    2020-04-08
    老师,你好,消费端拦截器里接收到的多条消息是同一条吗?想在拦截器发送前,消费前对同一条消息做些处理可以实现吗

    作者回复: 不同同一条,是一批消息。onConsume方法就是在poll返回消息前调用的,你可以在这个方法中加入你的处理逻辑。

    1