20 | 多线程开发消费者实例
20 | 多线程开发消费者实例
讲述:胡夕
时长11:34大小10.59M
Kafka Java Consumer 设计原理
多线程方案
实现代码示例
小结
开放讨论
赞 10
提建议
精选留言(69)
- yhh2019-07-18希望老师能讲讲方案2下线程池怎么管理和提交位移!!共 19 条评论77
- 小生向北2019-07-18能够用多线程解决的就不要用多进程,毕竟资源有限。方案2的讲解还是太浅了,同希望老师能针对方案2详细讲解一下!方案2里面在异步线程里提交offset,每个线程自己提交自己的,如果中间有offset提交失败,后面的offset又提交成功了咋办呢?而且每个线程都自己提交consumer.commit()就意味着要在多个线程里面使用consumer,如文中所说,这种情况是要报CME错误的,那究竟该如何正确的提交呢,有没有最佳实践呀?共 4 条评论40
- james2019-07-18方案2最核心的如何commit老师没有说,难道只能启用自动提交吗?我觉得可以用Cyclicbarrier来实现线程池执行完毕后,由consumer来commit,不用countdownlatch因为它只能记录一次,而cb可以反复用,或者用forkjoin方式,总之要等待多线程都处理完才能commit,风险就是某个消息处理太慢回导致整体都不能commit,而触发rebalance以及重复消费,而重复消费我用布隆过滤器来解决共 4 条评论24
- QQ怪2019-07-18老师能否加餐spring-kafka相关知识共 2 条评论25
- 注定非凡2019-11-05A :Kafka Java Consumer是单线程设计原理。 (1)在Kafka从0.10.1.0版本开始,KafkaConsumer就变成双线程设计即:用户主线程和心跳线程。 (2)主线程是指:启动Consumer应用程序main方法的那个线程,而新引入的心跳线程只负责定期给对应的Broker机器发送心跳请求,以标识消费者应用的存活性。 (2)老版本中有Scala Consumer的API,是多线程架构的,每个Consumer实例在内部为所有订阅的主题分区创建对应消息获取线程,也称为Fetcher线程。老版本Consumer同时也是阻塞式的(blocking),Consumer实例启动后,内部会创建很多阻塞式的消息迭代器。 (3)在很多场景下,Consumer端是有非阻塞需求的,如流处理应用中执行过滤(filter),连接(join),分组(group by)等操作时就不能是阻塞式的。 所以,新版本Consumer设计了单线程+轮询的机制。这种设计能够较好的实现非阻塞式的消息获取。 B :单线程设计优点 (1)单线程可以较好的实现如在流处理应用中执行过滤(filter),连接(join),分组(group by)等操作。 (2)单线程能够简化Consumer端设计。Consumer端获取到消息后,处理消息的逻辑是否采用多线程,由自己决定。 (3)单线程设计在很多种编程中都比较易于实现,编译社区移植。 C :多线程方案 (1)KafkaConsumer类不是线程安全的(thread-safe)。所有的网络I/O处理都是发生在用户主线程中,所以不能在多线程中共享同一个KafkaConsumer实例,否则程序会抛ConcurrentModificationException异常。 (2)方案一: 消费者程序启动多个线程,每个线程维护专属的KafkaConsumer实例,负责完整的消息获取,消息处理流程。 优点: 方便实现,速度快,无线程间交互开销,易于维护分区的消息顺序 缺点: 占用更多的系统资源,线程数受限于主题分区数,扩展性差。线程自己处理消息容易超时,进而引发Rebalance。 (3)方案二: 消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是多个,每个线程维护专属的KafkaConsumer实例,处理消息则交由特定的线程池来做。 优点: 可独立扩展消费获取线程数和worker线程数,伸缩性好 缺点: 难以维护分区内的消息消费顺序,处理链路拉长,不易于位移提交管理,实现难度高。展开21
- calljson2019-07-18希望老师能对比spring-kafka源码,关于多线程管理consumer谢谢22
- 千屿2019-07-20最近用spring cloud做了一个kafka可靠消息微服务组件,有兴趣的朋友可以看看 ,消费端是多线程模型,消费线程和业务执行分离,使用了mongodb(分片+副本集) 存储消息发送的链路,对发送失败的消息做了补偿机制。https://gitee.com/huacke/mq-kafka,有问题可以联系我。20
- 寂静欢喜2019-11-27老师 想问下 心跳线程是和主线程分开的,那么 第一种方案中,主线程阻塞,又怎么会导致超时Rebalance呢?
作者回复: 应该这么说,心跳线程会定期地检查前端线程是否卡住了,一旦发现卡住了,便会发起主动离组。
共 3 条评论20 - 玉剑冰锋2019-07-18Kafka重启时间比较长,每次重启一台差不多四五十分钟,日志保存12个小时,每台数据量差不多几个T,想请教一下老师有什么可以优化的参数吗?
作者回复: 有可能是要加载的日志段数据太多导致的,可以增加num.recovery.threads.per.data.dir的值
共 4 条评论18 - 来碗绿豆汤2019-07-22对于第二种方案,可以添加一个共享的队列,消费线程消费完一个记录,就写入队列,然后主线程可以读取这个队列,然后依次提交小号的offset共 1 条评论14
- yic2020-02-12老师,关于方案2中的做法,位移提交是有重复消费消息和丢失数据的风险的,有没有什么好的实践呀?
作者回复: 最好的办法就是自己完全实现一套多线程+管理offset的方案,就像Spark Streaming和Flink做的那样。有兴趣的话可以阅读以下Flink中Kafka Connector的源代码:)
共 2 条评论8 - z.l2019-07-20请教个问题,如果使用方案1,一个consumer group订阅了2个topic,每个topic都是24个分区,此时最大线程数可以设置为24还是48?
作者回复: 理论上是48,但实际上这么多线程反而是开销,可以采用多进程的方式
8 - KEEPUP2019-07-19希望老师讲一下sparkstreaming 消费kafka 消息的情况7
- 飞翔2019-12-04老师 想问一个方案1 谁负责分配线程给每个partition呀 我看您的code 只是没产生一个线程去消费一个主题 如果我有4个parition 那么我产生4个线程来消费这个主题,他会自动均匀分配嘛
作者回复: “谁负责分配线程给每个partition呀” --- leader consumer负责分配。 会均匀分配的,这是kafka consumer代码保证的
7 - 张洋2020-05-19老师如果当前consumer group下的consumer instance 只分配了当前主题的一个分区是不是意味着 当前也只能是一个线程来消费消息了
作者回复: 取决于consumer instance是线程还是进程。通常情况下如果consumer instance是进程的话,还是可以使用多个线程来消费这个获取到的数据。
5 - 高志强2019-12-25老师我现在用Php多进程消费,一个topic 130个分区,我是不是该启动130个进程去消费,目前启动64个进程,但消费能力上不去,消息积压量有几十万了,怎么才能提高消费能力呢
作者回复: 可以考虑单个进程下再开多线程的方式来增强消费能力,不必一味考虑多进程的方案
共 4 条评论4 - Hale2019-12-24如果只有一个broker,一个consumer 一个分区,上面的consumer 组成一个组,一个topic 当consumer 卡住时,协调器会将消费者踢出消费组,进行重新分区分配,但只有一个消费者,那消费者就不能接受到数据了,怎样实现消费者重连
作者回复: 消费者会自动重连的,如果重连失败,说明网络有问题
4 - YWH2019-12-16老师,想请教消费者的一个问题... 我们的业务场景是这样的:建立一个服务接收 http 请求、根据传入的参数(topic)从 Kafka 指定 topic 拉取一定数量的消息后返回。但 Kafka 的消费者是要保持轮询的,不然就只能每次建立消费者、获取分区/加入群组、请求数据后关闭消费者(但这样效率很低)。 请问有什么比较好又可靠的实现方法吗?谢谢~
作者回复: 持续消费一部分消息缓存到本地,http接口从本地读取消息。如果长时间不拉取,consumer先pause消费
4 - 胡家鹏2019-10-23老师及各位朋友好,问下两个问题1.上面的代码怎么没有消费位移提交,难道是设置的自动提交位移吗?2.consumer.wakeup什么时候使用,来解决什么问题呢?
作者回复: 1. 您指哪段代码?另外如果设置了enable.auto.commit=true或没有显式设置enable.auto.commit=false,就是自动提交 2. wakeup主要用于唤醒polling中的consumer实例。如果你使用了多线程(即把KafkaConsumer实例用于单独的线程),你需要有能力在另一个线程中“中断”KafkaConsumer所在实例的执行。wakeup就是用这个的
共 4 条评论4 - Xiao2019-07-18胡老师,第二种方案我觉得还有个问题就是如果是自动提交,那就会容易出现消息丢失,因为异步消费消息,如果worker线程有异常,主线程捕获不到异常,就会造成消息丢失,这个是不是还需要做补偿机制;如果是手动提交,那offer set也会有可能丢失,消息重复消费,消息重复还好处理,做幂等就行。4