极客时间已完结课程限时免费阅读

30 | 怎么重设消费者组位移?

30 | 怎么重设消费者组位移?-极客时间

30 | 怎么重设消费者组位移?

讲述:胡夕

时长08:55大小8.17M

你好,我是胡夕。今天我要跟你分享的主题是:如何重设消费者组位移。

为什么要重设消费者组位移?

我们知道,Kafka 和传统的消息引擎在设计上是有很大区别的,其中一个比较显著的区别就是,Kafka 的消费者读取消息是可以重演的(replayable)。
像 RabbitMQ 或 ActiveMQ 这样的传统消息中间件,它们处理和响应消息的方式是破坏性的(destructive),即一旦消息被成功处理,就会被从 Broker 上删除。
反观 Kafka,由于它是基于日志结构(log-based)的消息引擎,消费者在消费消息时,仅仅是从磁盘文件上读取数据而已,是只读的操作,因此消费者不会删除消息数据。同时,由于位移数据是由消费者控制的,因此它能够很容易地修改位移的值,实现重复消费历史数据的功能。
对了,之前有很多同学在专栏的留言区提问:在实际使用场景中,我该如何确定是使用传统的消息中间件,还是使用 Kafka 呢?我在这里统一回答一下。如果在你的场景中,消息处理逻辑非常复杂,处理代价很高,同时你又不关心消息之间的顺序,那么传统的消息中间件是比较合适的;反之,如果你的场景需要较高的吞吐量,但每条消息的处理时间很短,同时你又很在意消息的顺序,此时,Kafka 就是你的首选。

重设位移策略

不论是哪种设置方式,重设位移大致可以从两个维度来进行。
位移维度。这是指根据位移值来重设。也就是说,直接把消费者的位移值重设成我们给定的位移值。
时间维度。我们可以给定一个时间,让消费者把位移调整成大于该时间的最小位移;也可以给出一段时间间隔,比如 30 分钟前,然后让消费者直接将位移调回 30 分钟之前的位移值。
下面的这张表格罗列了 7 种重设策略。接下来,我来详细解释下这些策略。
Earliest 策略表示将位移调整到主题当前最早位移处。这个最早位移不一定就是 0,因为在生产环境中,很久远的消息会被 Kafka 自动删除,所以当前最早位移很可能是一个大于 0 的值。如果你想要重新消费主题的所有消息,那么可以使用 Earliest 策略
Latest 策略表示把位移重设成最新末端位移。如果你总共向某个主题发送了 15 条消息,那么最新末端位移就是 15。如果你想跳过所有历史消息,打算从最新的消息处开始消费的话,可以使用 Latest 策略。
Current 策略表示将位移调整成消费者当前提交的最新位移。有时候你可能会碰到这样的场景:你修改了消费者程序代码,并重启了消费者,结果发现代码有问题,你需要回滚之前的代码变更,同时也要把位移重设到消费者重启时的位置,那么,Current 策略就可以帮你实现这个功能。
表中第 4 行的 Specified-Offset 策略则是比较通用的策略,表示消费者把位移值调整到你指定的位移处。这个策略的典型使用场景是,消费者程序在处理某条错误消息时,你可以手动地“跳过”此消息的处理。在实际使用过程中,可能会出现 corrupted 消息无法被消费的情形,此时消费者程序会抛出异常,无法继续工作。一旦碰到这个问题,你就可以尝试使用 Specified-Offset 策略来规避。
如果说 Specified-Offset 策略要求你指定位移的绝对数值的话,那么 Shift-By-N 策略指定的就是位移的相对数值,即你给出要跳过的一段消息的距离即可。这里的“跳”是双向的,你既可以向前“跳”,也可以向后“跳”。比如,你想把位移重设成当前位移的前 100 条位移处,此时你需要指定 N 为 -100。
刚刚讲到的这几种策略都是位移维度的,下面我们来聊聊从时间维度重设位移的 DateTime 和 Duration 策略。
DateTime 允许你指定一个时间,然后将位移重置到该时间之后的最早位移处。常见的使用场景是,你想重新消费昨天的数据,那么你可以使用该策略重设位移到昨天 0 点。
Duration 策略则是指给定相对的时间间隔,然后将位移调整到距离当前给定时间间隔的位移处,具体格式是 PnDTnHnMnS。如果你熟悉 Java 8 引入的 Duration 类的话,你应该不会对这个格式感到陌生。它就是一个符合 ISO-8601 规范的 Duration 格式,以字母 P 开头,后面由 4 部分组成,即 D、H、M 和 S,分别表示天、小时、分钟和秒。举个例子,如果你想将位移调回到 15 分钟前,那么你就可以指定 PT0H15M0S。
我会在后面分别给出这 7 种重设策略的实现方式。不过在此之前,我先来说一下重设位移的方法。目前,重设消费者组位移的方式有两种。
通过消费者 API 来实现。
通过 kafka-consumer-groups 命令行脚本来实现。

消费者 API 方式设置

首先,我们来看看如何通过 API 的方式来重设位移。我主要以 Java API 为例进行演示。如果你使用的是其他语言,方法应该是类似的,不过你要参考具体的 API 文档。
通过 Java API 的方式来重设位移,你需要调用 KafkaConsumer 的 seek 方法,或者是它的变种方法 seekToBeginning 和 seekToEnd。我们来看下它们的方法签名。
void seek(TopicPartition partition, long offset);
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
void seekToBeginning(Collection<TopicPartition> partitions);
void seekToEnd(Collection<TopicPartition> partitions);
根据方法的定义,我们可以知道,每次调用 seek 方法只能重设一个分区的位移。OffsetAndMetadata 类是一个封装了 Long 型的位移和自定义元数据的复合类,只是一般情况下,自定义元数据为空,因此你基本上可以认为这个类表征的主要是消息的位移值。seek 的变种方法 seekToBeginning 和 seekToEnd 则拥有一次重设多个分区的能力。我们在调用它们时,可以一次性传入多个主题分区。
好了,有了这些方法,我们就可以逐一地实现上面提到的 7 种策略了。我们先来看 Earliest 策略的实现方式,代码如下:
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
String topic = "test"; // 要重设位移的Kafka主题
try (final KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(consumerProperties)) {
consumer.subscribe(Collections.singleton(topic));
consumer.poll(0);
consumer.seekToBeginning(
consumer.partitionsFor(topic).stream().map(partitionInfo ->
new TopicPartition(topic, partitionInfo.partition()))
.collect(Collectors.toList()));
}
这段代码中有几个比较关键的部分,你需要注意一下。
你要创建的消费者程序,要禁止自动提交位移。
组 ID 要设置成你要重设的消费者组的组 ID。
调用 seekToBeginning 方法时,需要一次性构造主题的所有分区对象。
最重要的是,一定要调用带长整型的 poll 方法,而不要调用 consumer.poll(Duration.ofSecond(0))。
虽然社区已经不推荐使用 poll(long) 了,但短期内应该不会移除它,所以你可以放心使用。另外,为了避免重复,在后面的实例中,我只给出最关键的代码。
Latest 策略和 Earliest 是类似的,我们只需要使用 seekToEnd 方法即可,如下面的代码所示:
consumer.seekToEnd(
consumer.partitionsFor(topic).stream().map(partitionInfo ->
new TopicPartition(topic, partitionInfo.partition()))
.collect(Collectors.toList()));
实现 Current 策略的方法很简单,我们需要借助 KafkaConsumer 的 committed 方法来获取当前提交的最新位移,代码如下:
consumer.partitionsFor(topic).stream().map(info ->
new TopicPartition(topic, info.partition()))
.forEach(tp -> {
long committedOffset = consumer.committed(tp).offset();
consumer.seek(tp, committedOffset);
});
这段代码首先调用 partitionsFor 方法获取给定主题的所有分区,然后依次获取对应分区上的已提交位移,最后通过 seek 方法重设位移到已提交位移处。
如果要实现 Specified-Offset 策略,直接调用 seek 方法即可,如下所示:
long targetOffset = 1234L;
for (PartitionInfo info : consumer.partitionsFor(topic)) {
TopicPartition tp = new TopicPartition(topic, info.partition());
consumer.seek(tp, targetOffset);
}
这次我没有使用 Java 8 Streams 的写法,如果你不熟悉 Lambda 表达式以及 Java 8 的 Streams,这种写法可能更加符合你的习惯。
接下来我们来实现 Shift-By-N 策略,主体代码逻辑如下:
for (PartitionInfo info : consumer.partitionsFor(topic)) {
TopicPartition tp = new TopicPartition(topic, info.partition());
// 假设向前跳123条消息
long targetOffset = consumer.committed(tp).offset() + 123L;
consumer.seek(tp, targetOffset);
}
如果要实现 DateTime 策略,我们需要借助另一个方法:KafkaConsumer. offsetsForTimes 方法。假设我们要重设位移到 2019 年 6 月 20 日晚上 8 点,那么具体代码如下:
long ts = LocalDateTime.of(
2019, 6, 20, 20, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
Map<TopicPartition, Long> timeToSearch =
consumer.partitionsFor(topic).stream().map(info ->
new TopicPartition(topic, info.partition()))
.collect(Collectors.toMap(Function.identity(), tp -> ts));
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
consumer.offsetsForTimes(timeToSearch).entrySet()) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}
这段代码构造了 LocalDateTime 实例,然后利用它去查找对应的位移值,最后调用 seek,实现了重设位移。
最后,我来给出实现 Duration 策略的代码。假设我们要将位移调回 30 分钟前,那么代码如下:
Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream()
.map(info -> new TopicPartition(topic, info.partition()))
.collect(Collectors.toMap(Function.identity(), tp -> System.currentTimeMillis() - 30 * 1000 * 60));
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
consumer.offsetsForTimes(timeToSearch).entrySet()) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}
总之,使用 Java API 的方式来实现重设策略的主要入口方法,就是 seek 方法

命令行方式设置

位移重设还有另一个重要的途径:通过 kafka-consumer-groups 脚本。需要注意的是,这个功能是在 Kafka 0.11 版本中新引入的。这就是说,如果你使用的 Kafka 是 0.11 版本之前的,那么你只能使用 API 的方式来重设位移。
比起 API 的方式,用命令行重设位移要简单得多。针对我们刚刚讲过的 7 种策略,有 7 个对应的参数。下面我来一一给出实例。
Earliest 策略直接指定 --to-earliest
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute
Latest 策略直接指定 --to-latest
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute
Current 策略直接指定 --to-current
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute
Specified-Offset 策略直接指定 --to-offset
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset <offset> --execute
Shift-By-N 策略直接指定 --shift-by N
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by <offset_N> --execute
DateTime 策略直接指定 --to-datetime
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute
最后是实现 Duration 策略,我们直接指定 --by-duration
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute

小结

至此,重设消费者组位移的 2 种方式我都讲完了。我们来小结一下。今天,我们主要讨论了在 Kafka 中,为什么要重设位移以及如何重设消费者组位移。重设位移主要是为了实现消息的重演。目前 Kafka 支持 7 种重设策略和 2 种重设方法。在实际使用过程中,我推荐你使用第 2 种方法,即用命令行的方式来重设位移。毕竟,执行命令要比写程序容易得多。但是需要注意的是,0.11 及 0.11 版本之后的 Kafka 才提供了用命令行调整位移的方法。如果你使用的是之前的版本,那么就只能依靠 API 的方式了。

开放讨论

你在实际使用过程中,是否遇到过要重设位移的场景,你是怎么实现的?
欢迎写下你的思考和答案,我们一起讨论。如果你觉得有所收获,也欢迎把文章分享给你的朋友。
分享给需要的人,Ta购买本课程,你将得20
生成海报并分享

赞 8

提建议

上一篇
29 | Kafka动态配置了解下?
下一篇
31 | 常见工具脚本大汇总
unpreview
 写留言

精选留言(67)

  • 小可爱
    2019-08-29
    current是回到最近提交位移处,但是消费者不是本来就从最近提交处继续消费吗

    作者回复: current主要是为了调试场景。比如这样的场景:你停掉了consumer,现在offset=50,然后修改了consumer代码重新上线,consumer开始从50消费,运行了一段时间发现你修改的代码有问题,还要继续改,那么下掉consumer,将offset调回current,改代码之后再上线,consumer从50消费。此时current策略就显得很方便了,对吧?

    共 8 条评论
    34
  • QQ怪
    2019-08-10
    比较暴力的重新开个消费组从头消费😂
    共 3 条评论
    18
  • Curry
    2020-05-05
    老师,为什么要poll(0)一下?是为了获取元数据吗?

    作者回复: 是的,去拿元数据

    共 2 条评论
    16
  • 喜欢地球的阿培同学
    2020-05-27
    老师,问一个问题: 像RocketMQ这样的消息引擎,如果消费者消费某条消息一直失败,会将这条消息放到 “死信”队列里,然后消费者继续消费下一条消息。在kafka中,如果消费者消费某条消息一直失败,会怎么处理呢?难道程序会一直消费这条消息,然后失败.. 继续消费这条消息 .. 然后继续失败 ......

    作者回复: consumer端可以选择跳过该消息。的确这方面Kafka没有提供开箱即用的dead letter queue~

    共 2 条评论
    10
  • 水天一色
    2019-12-17
    请问,重置offset到 datetime,这个 datetime 是生产时间还是当前group的消费时间?

    作者回复: 消息的创建时间

    共 2 条评论
    7
  • cricket1981
    2019-08-12
    "最重要的是,一定要调用带长整型的 poll 方法,而不要调用 consumer.poll(Duration.ofSecond(0))。"--- 能讲下为什么吗?如果不遵守会怎么样?

    作者回复: 两个的实现方式不一样。详细设计原理差别可以看看:https://www.cnblogs.com/huxi2b/p/10773559.html

    共 5 条评论
    6
  • Li Shunduo
    2019-08-12
    试了下开着console consumer的时候去调整offset,遇到以下错误: Error: Assignments can only be reset if the group 'test_group' is inactive, but the current state is Stable. 停掉console consumer之后,就可以调整offset了。 好像不能动态调整?

    作者回复: 嗯,必须是非active的group才行

    6
  • cuiyunfeng
    2019-09-06
    重置位移前,执行consumer.poll(),可以让kafka强制更新担当的partition信息,以防止发生kafka rebalance后partition信息陈旧,然后再取得partition信息进行offset位移。或者也可以配合使用kafka listener来处理发生rebalance情况下,进行重置位移。
    共 1 条评论
    5
  • What for
    2019-08-31
    请问一下老师在重设位移之前为什么要调用 consumer.poll 方法?是为了连接集群吗?

    作者回复: 嗯嗯,你可以这么认为,其实还有其他作用,比如获取集群信息后台主题数据等

    4
  • James
    2020-07-02
    老师,你好.存在以下问题,麻烦解答下: 1.对文中解释的Latest与Current感觉区分不清楚,老能能详细举例介绍下; 文中是禁止提交位移,那么这两个应该都是一样,最新的位移位置. 要是可以提交位移呢;不是特别懂 2.第一条评论中,current主要是为了调试场景,要是有提交位移(50->100),那么重新上线使用current策略,最新提交位移是不是100;
    展开

    作者回复: Latest 策略表示把位移重设成最新末端位移,也就是LEO。Current 策略表示将位移调整成消费者当前提交的最新位移,是消费者已经提交的位移,后者必然小于等于LEO

    共 2 条评论
    3
  • 无菇朋友
    2019-09-18
    老师 问一下 如果 我想针对某个分区重置位移,怎么做

    作者回复: KafkaConsumer.seek方法支持指定单个分区进行重设位移

    3
  • JasonZhi
    2019-08-30
    老师,不是还可以通过auto offset reset配置项重设位移吗?怎么这里没有说

    作者回复: 这是说的是手动设置位移的情况,自动设置位移是Kafka自动做的。当然也算是重设位移的一种

    3
  • 无菇朋友
    2019-08-26
    请问老师,current这个选项的应用场景是什么?

    作者回复: 如果你上线的consumer程序有bug,需要重演自上线起处理过的消息,那么可以考虑使用这个策略

    3
  • 锋芒
    2019-08-23
    请问,用命令行重设位移,应该在当前group 的leader 节点上?

    作者回复: 不需要的

    3
  • Curry
    2020-05-05
    请问如何不停止程序去指定位移呢?

    作者回复: 目前可以通过Consumer API的seek方法来实现

    共 3 条评论
    2
  • helloworld
    2020-03-16
    这里的--all-topics是什么含义呢?为什么Shift-By-N、DateTime与Duration均没有?是不是可以这么认为:通过脚本的方式设置位移只能设置Topic的所有位移,并且所设置的位移会使各个分区都一样?而不能单独设置某个分区的位移??

    作者回复: 使用consumer-groups命令可以设置单个分区的位移,文中只是用--all-topics举个例子。比如指定--topics topic1:0,1,2

    共 2 条评论
    2
  • chp
    2020-03-01
    老师,为啥我指定消费者位移没效果 ------------------------------------------------- String topic = "foo"; try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) { consumer.subscribe(Collections.singleton(topic)); ConsumerRecords consumerRecords = consumer.poll(0); Iterable<ConsumerRecord<String, String>> iterable = consumerRecords.records("foo"); for (Iterator iterator = iterable.iterator(); iterator.hasNext();) { ConsumerRecord<String, String> consumerRecord = (ConsumerRecord)iterator.next(); log.info("topic = {}, partition = {}, offset = {}, key = {}, value = {}\n", consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), consumerRecord.key(), consumerRecord.value()); } for (PartitionInfo info : consumer.partitionsFor(topic)) { TopicPartition tp = new TopicPartition(topic, info.partition()); consumer.seek(tp, 3L); } } ---------------------------------------------------------- 没报错,就是设置了位移后,offset没变化
    展开

    作者回复: 后面有执行poll方法了没?

    2
  • 归零
    2021-02-22
    老师,文中说你要创建的消费者程序,要禁止自动提交位移。为什么重设位移需要禁止自动提交呢?二者有什么关系吗?谢谢🙏

    作者回复: 并不是说重设位移和禁止自动提交一定是互斥的。只是你既然要重设位移说明你要亲自对位移进行管理,那么最好禁掉自动提交

    1
  • 尚小树
    2020-09-26
    老师好。我遇到个问题,在消费消息过程中要查询数据库中的记录,但是消息来的时候库中记录不一定存在,大概延时0-30秒,这样就导致消费逻辑不成功。经讨论保证记录在消息前写入不可行。 所以只能重复消费,我用的办法开个线程睡一会再重新把不成功的消息写回topic中,如果几次不成功就记录错误日志不会永远循环。 想问老师这样做有没有什么问题,有没有更好的解决方案呢☺️谢谢老师。
    展开

    作者回复: 这取决于记录不存在的比例。是大部分时候都不存在还是只有偶发的不存在。如果是后者可以考虑将不存在的零星消息缓存起来以待后面重新处理。如果是前者,则需要通盘考虑消费逻辑。

    共 2 条评论
    1
  • 对与错
    2020-07-30
    请问ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"这个配置和seek()方法之间有关系吗?比如我设置的是earliest,但是使用的是seekToEnd

    作者回复: ConsumerConfig.AUTO_OFFSET_RESET_CONFIG是出现位移重设置时的策略。平时位移设置的策略不取决于这个参数

    1