12 | 客户端都有哪些不常见但是很高级的功能?
12 | 客户端都有哪些不常见但是很高级的功能?
讲述:胡夕
时长11:03大小8.86M
什么是拦截器?
Kafka 拦截器
典型使用场景
案例分享
小结
开放讨论
赞 20
提建议
精选留言(51)
- 风中花2019-07-04胡老师您好! 我公司现在就我一个人懂点kafka,但是公司线下却有使用kafka,现在知道我学习这个就交给我了,我现在遇到一个线上问题:消息经常堆积起来,不能消费了,重启服务就能继续消费了。我目前得能力还搞不定,望老师能给指点一二 。谢谢。谢谢
作者回复: 消息堆积可能原因如下:1. 生产速度大于消费速度,这样可以适当增加分区,增加consumer数量,提升消费TPS;2. consumer消费性能低,查一下是否有很重的消费逻辑(比如拿到消息后写HDFS或HBASE这种逻辑就挺重的),看看是否可以优化consumer TPS;3. 确保consumer端没有因为异常而导致消费hang住; 4. 如果你使用的是消费者组,确保没有频繁地发生rebalance 主要排查下可能是哪些原因
共 17 条评论79 - 王槐铤2019-06-30onSend传null会在KafkaProducer类中调用doSend时引发NPE,并通过 ProducerInterceptors.onSendError 方法传导至onAcknowledgement,以及throw到用户编写的Producer中。21
- 落霞与孤鹜2019-06-30这个问题和这期的内容没关系😓。 如果一个主题,由一个应用的名为A的消费组消费,然后把消费组名改为B,重新发布应用,这个时候是不是从主题的分区头开始消费?如何保证从上次A消费组的最新偏移量处开始消费?
作者回复: 我假设你指的名字是group.id。那么把A改成B对于Kafka而言就是新的consumer。新consumer从头还是从最新开始消费取决于auto.offset.reset的设置
共 3 条评论20 - 李先生2020-04-10胡哥: consumer消费:比如异步发积分,发积分的消息进入kafka,加积分服务监听kafka的topic,为了避免重复消费,加积分服务获取到消息后先写入mysql,并利用mysql的唯一索引的能力来避免重复消费,然后加积分服务异步去执行mysql中的信息去实现加积分。这种实现方案会导致消费性能低下,但是写入mysql一是避免重复消费,二是做一条成功的记录(便于后期查询)。这种如何优化呢展开
作者回复: 如果只是这样使用,我倒是不建议用MySQL来做去重,你还不如在应用层面自行去重。一定要用的话,不妨试试把MySQL表为topic key做分区表吧
12 - 张庆2019-07-02return null ; 报错了,NullPointException错误,KafkaProducer doSend方法里面获取消息主题和分区。共 1 条评论11
- Lei@时速云2019-06-29👍 胡总出专栏了
作者回复: 磊总别闹:)
8 - 打码的土豆2019-07-01老师你好 最近在看kafka官方文档时有一处没看懂还望老师指教 All data is immediately written to a persistent log on the filesystem without necessarily flushing to disk. In effect this just means that it is transferred into the kernel's pagecache 这上面说的文件系统上的持久日志为什么会是pagecache pagecache不是内存的一部分吗
作者回复: 它这句话的意思是不需要用户手动调用flush来刷盘,由os自己来做:)
共 2 条评论7 - IT小僧2020-01-13老师好!这个消费者段onConsume是在消费数据之前执行的方法,并不能真正统计实际处理该消费耗费时间的吧。我觉得理应放在onCommit里面。
作者回复: 是的,你是对的~ 其实放在哪个方法里面取决于你的计时逻辑:)
6 - 小马2020-01-09老师有两个疑问请教下: 1、时间的一致性问题。System.currentTimeMillis() - record.timestamp()发送和接收的时间实际上可能来自两台机器,有可能时间不一致,会导致统计结果偏差很大; 2、消费端代码计算时间差是在循环里面进行的,把System.currentTimeMillis()提到循环外面应该会好一点吧,毕竟这一批消息应该算是同时接收的; 3、消息总数是在生产端统计的,时延是在消费端统计的,但是如果在消息传输过程中出现部分消息丢失是不是会影响统计的准确性。展开
作者回复: 1. 确实有这个问题。用户自己来规避之 2. 同意~ 3. 我们还是能保证消息不丢失的吧:)
共 4 条评论6 - 钱2019-08-14胡老师好,请教两个小问题 1:broker通过网络拿到消息后,落盘的规则是什么?来一条消息就落盘一条?还是积攒够一定的量再落盘? 2:从追主,新的消息是主主动推给各个从的?还是从主动拉取的?如果是主动拉取,从怎么知道主有新的消息的?另外,同步的时候是只要有一条新消息就同步嘛? 如果其他同学清楚,也请帮忙解答一下,多谢。展开
作者回复: 1. 实际上这个交由OS来决定,Kafka不管 2. 从拉取的方式。Kafka定义了水位的概念来标识拉取的进度
共 3 条评论6 - 周曙光爱学习2020-04-09拦截器的确是很有用,我们在grpc的拦截器中做限流处理。同理,由于下游存储TPS能力有限,也完全可以在kafka消费的拦截器中做消费限流处理,防止把存储打挂
作者回复: 嗯嗯,算是一个不太常见功能的典型应用了:)
4 - 进击的姬哥2019-11-23Interceptor处理数据是单条的吗,还是多条数据作为一个集合
作者回复: 单条消息
4 - James2019-11-13请问老师无法完成提交,是因为重新平衡,是什么原因才会导致. 刚接触不久,就要修改线上环境问题.但是一直跑了一小时就会下面问题,然后oom Group coordinator rl-hadoop5.gci.com:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery Offset commit failed on partition dispatch_bus-1 at offset 28978632: The coordinator is not aware of this member. Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.展开
作者回复: 我的经验是先解决OOM的问题吧。至于commit 失败,通常是因为事件处理的速度太慢了
共 7 条评论4 - Roy Liang2020-04-14老师你好,onAcknowledgement不应该是消息ACK后吗?怎么是提交成功后呢?提交成功后感觉应该属于callback的
作者回复: hmmm.... 我的意思是消息被broker端应答就表示消息被提交成功了。
3 - Liam2019-06-29请问下老师。onsend或onconsumer的执行线程和发送或消费消息的线程是否是同一个线程?加入太多耗时逻辑是否会影响主逻辑?
作者回复: onsend是producer进程下的线程;onConsume是consumer进程下的线程,它们不是一个进程。我说的是onSend和onAcknowledgement是一个进程下的多个线程。
3 - 振超2019-06-29Producer 发送的是被 ProducerInterceptor 修改后的消息,返回 null 也是一种修改的行为,所以 kafka 不应该对这种情况特殊对待。不过将 null 发送到服务端没有意义,实际执行会出现 NPE,不过异常最终会被捕获传递给 ProducerInterceptor 的 onAcknowledgement 方法。2
- 小马哥2021-05-18结果: KafkaProducer.doSend(KafkaProducer.java:876)抛出java.lang.NullPointerException异常 原理解释: 与Javaweb的拦截器一样, 被自定义的拦截器拦截之后, 在生产者doSend之前没有将消息放行, 而是放行了null, 生产者doSend(null)抛异常1
- 双椒叔叔2020-07-13胡老师,我看了源码后发现 1038,1037,1029-----reassign----->1038,1037,1048 迁移计划卡住的那步是这样的 首先1029副本的状态从replica中移除流程是 controller先把1029offline, 然后 controller发送状态改变请求给1029 1、first move the replica to offline state (the controller removes it from the ISR) 2、send stop replica command to the old replicas 3、Eventually partition reassignment could use a callback that does retries if deletion failed 然后返回一个回调状态值NonExistentReplica(因为1029现在是死了的状态)。 那么这种情况如何解决呢? 是直接在zk中修改该主题的问题分区(执行迁移计划卡主的那个分区)吗?展开共 1 条评论2
- 双椒叔叔2020-07-13分区迁移遇到的问题怎么解决呢 1038,1037,1029-----reassign----->1038,1037,1048 迁移计划卡死了 replica变成1038,1037,1048,1029了,ISR变成了1038,1048 其实就是1029机子先宕掉了,我想要把死掉的1029机子上的副本迁移到1048上 现在卡死了,一直in progress 我想要在replica中remove1029,但是我看了源码发现是状态机维护每一个replica,自己没办法删除,所以现在没辙了。。。展开1
- 飞猪环游记2020-04-08老师,你好,消费端拦截器里接收到的多条消息是同一条吗?想在拦截器发送前,消费前对同一条消息做些处理可以实现吗
作者回复: 不同同一条,是一批消息。onConsume方法就是在poll返回消息前调用的,你可以在这个方法中加入你的处理逻辑。
1