30 | 怎么重设消费者组位移?
30 | 怎么重设消费者组位移?
讲述:胡夕
时长08:55大小8.17M
为什么要重设消费者组位移?
重设位移策略
消费者 API 方式设置
命令行方式设置
小结
开放讨论
赞 8
提建议
精选留言(67)
- 小可爱2019-08-29current是回到最近提交位移处,但是消费者不是本来就从最近提交处继续消费吗
作者回复: current主要是为了调试场景。比如这样的场景:你停掉了consumer,现在offset=50,然后修改了consumer代码重新上线,consumer开始从50消费,运行了一段时间发现你修改的代码有问题,还要继续改,那么下掉consumer,将offset调回current,改代码之后再上线,consumer从50消费。此时current策略就显得很方便了,对吧?
共 8 条评论34 - QQ怪2019-08-10比较暴力的重新开个消费组从头消费😂共 3 条评论18
- Curry2020-05-05老师,为什么要poll(0)一下?是为了获取元数据吗?
作者回复: 是的,去拿元数据
共 2 条评论16 - 喜欢地球的阿培同学2020-05-27老师,问一个问题: 像RocketMQ这样的消息引擎,如果消费者消费某条消息一直失败,会将这条消息放到 “死信”队列里,然后消费者继续消费下一条消息。在kafka中,如果消费者消费某条消息一直失败,会怎么处理呢?难道程序会一直消费这条消息,然后失败.. 继续消费这条消息 .. 然后继续失败 ......
作者回复: consumer端可以选择跳过该消息。的确这方面Kafka没有提供开箱即用的dead letter queue~
共 2 条评论10 - 水天一色2019-12-17请问,重置offset到 datetime,这个 datetime 是生产时间还是当前group的消费时间?
作者回复: 消息的创建时间
共 2 条评论7 - cricket19812019-08-12"最重要的是,一定要调用带长整型的 poll 方法,而不要调用 consumer.poll(Duration.ofSecond(0))。"--- 能讲下为什么吗?如果不遵守会怎么样?
作者回复: 两个的实现方式不一样。详细设计原理差别可以看看:https://www.cnblogs.com/huxi2b/p/10773559.html
共 5 条评论6 - Li Shunduo2019-08-12试了下开着console consumer的时候去调整offset,遇到以下错误: Error: Assignments can only be reset if the group 'test_group' is inactive, but the current state is Stable. 停掉console consumer之后,就可以调整offset了。 好像不能动态调整?
作者回复: 嗯,必须是非active的group才行
6 - cuiyunfeng2019-09-06重置位移前,执行consumer.poll(),可以让kafka强制更新担当的partition信息,以防止发生kafka rebalance后partition信息陈旧,然后再取得partition信息进行offset位移。或者也可以配合使用kafka listener来处理发生rebalance情况下,进行重置位移。共 1 条评论5
- What for2019-08-31请问一下老师在重设位移之前为什么要调用 consumer.poll 方法?是为了连接集群吗?
作者回复: 嗯嗯,你可以这么认为,其实还有其他作用,比如获取集群信息后台主题数据等
4 - James2020-07-02老师,你好.存在以下问题,麻烦解答下: 1.对文中解释的Latest与Current感觉区分不清楚,老能能详细举例介绍下; 文中是禁止提交位移,那么这两个应该都是一样,最新的位移位置. 要是可以提交位移呢;不是特别懂 2.第一条评论中,current主要是为了调试场景,要是有提交位移(50->100),那么重新上线使用current策略,最新提交位移是不是100;展开
作者回复: Latest 策略表示把位移重设成最新末端位移,也就是LEO。Current 策略表示将位移调整成消费者当前提交的最新位移,是消费者已经提交的位移,后者必然小于等于LEO
共 2 条评论3 - 无菇朋友2019-09-18老师 问一下 如果 我想针对某个分区重置位移,怎么做
作者回复: KafkaConsumer.seek方法支持指定单个分区进行重设位移
3 - JasonZhi2019-08-30老师,不是还可以通过auto offset reset配置项重设位移吗?怎么这里没有说
作者回复: 这是说的是手动设置位移的情况,自动设置位移是Kafka自动做的。当然也算是重设位移的一种
3 - 无菇朋友2019-08-26请问老师,current这个选项的应用场景是什么?
作者回复: 如果你上线的consumer程序有bug,需要重演自上线起处理过的消息,那么可以考虑使用这个策略
3 - 锋芒2019-08-23请问,用命令行重设位移,应该在当前group 的leader 节点上?
作者回复: 不需要的
3 - Curry2020-05-05请问如何不停止程序去指定位移呢?
作者回复: 目前可以通过Consumer API的seek方法来实现
共 3 条评论2 - helloworld2020-03-16这里的--all-topics是什么含义呢?为什么Shift-By-N、DateTime与Duration均没有?是不是可以这么认为:通过脚本的方式设置位移只能设置Topic的所有位移,并且所设置的位移会使各个分区都一样?而不能单独设置某个分区的位移??
作者回复: 使用consumer-groups命令可以设置单个分区的位移,文中只是用--all-topics举个例子。比如指定--topics topic1:0,1,2
共 2 条评论2 - chp2020-03-01老师,为啥我指定消费者位移没效果 ------------------------------------------------- String topic = "foo"; try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) { consumer.subscribe(Collections.singleton(topic)); ConsumerRecords consumerRecords = consumer.poll(0); Iterable<ConsumerRecord<String, String>> iterable = consumerRecords.records("foo"); for (Iterator iterator = iterable.iterator(); iterator.hasNext();) { ConsumerRecord<String, String> consumerRecord = (ConsumerRecord)iterator.next(); log.info("topic = {}, partition = {}, offset = {}, key = {}, value = {}\n", consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), consumerRecord.key(), consumerRecord.value()); } for (PartitionInfo info : consumer.partitionsFor(topic)) { TopicPartition tp = new TopicPartition(topic, info.partition()); consumer.seek(tp, 3L); } } ---------------------------------------------------------- 没报错,就是设置了位移后,offset没变化展开
作者回复: 后面有执行poll方法了没?
2 - 归零2021-02-22老师,文中说你要创建的消费者程序,要禁止自动提交位移。为什么重设位移需要禁止自动提交呢?二者有什么关系吗?谢谢🙏
作者回复: 并不是说重设位移和禁止自动提交一定是互斥的。只是你既然要重设位移说明你要亲自对位移进行管理,那么最好禁掉自动提交
1 - 尚小树2020-09-26老师好。我遇到个问题,在消费消息过程中要查询数据库中的记录,但是消息来的时候库中记录不一定存在,大概延时0-30秒,这样就导致消费逻辑不成功。经讨论保证记录在消息前写入不可行。 所以只能重复消费,我用的办法开个线程睡一会再重新把不成功的消息写回topic中,如果几次不成功就记录错误日志不会永远循环。 想问老师这样做有没有什么问题,有没有更好的解决方案呢☺️谢谢老师。展开
作者回复: 这取决于记录不存在的比例。是大部分时候都不存在还是只有偶发的不存在。如果是后者可以考虑将不存在的零星消息缓存起来以待后面重新处理。如果是前者,则需要通盘考虑消费逻辑。
共 2 条评论1 - 对与错2020-07-30请问ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"这个配置和seek()方法之间有关系吗?比如我设置的是earliest,但是使用的是seekToEnd
作者回复: ConsumerConfig.AUTO_OFFSET_RESET_CONFIG是出现位移重设置时的策略。平时位移设置的策略不取决于这个参数
1