美文网首页
kafka消费者丢消息的原因分析

kafka消费者丢消息的原因分析

作者: Stalary | 来源:发表于2019-06-27 10:36 被阅读0次

场景:

通过kafka消费作业,存储到数据库中,当遇到错误信息时停止提交offset,关闭消费者,发送报警短信

项目运行了很久,但是突然发现在关闭消费者后,有时候重启会遇到消息丢失的情况,于是从代码入手,查找原因,消费者代码如下

        @Override
        public void run() {
            // 构建kafka监控
            Thread thread = Thread.currentThread();
            ThreadHolder threadHolder = new ThreadHolder(this.id, this.name, thread.getState());
            THREAD_HOLDER_MAP.put(this.id, threadHolder);
            log.info("consumer task start, id = " + id);
            while (runnable) {
                int partition = 0;
                long offset = 0;
                String key = null;
                String topic = null;
                String value = null;
                try {
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
                    for (ConsumerRecord<String, String> record : records) {
                        partition = record.partition();
                        offset = record.offset();
                        key = record.key();
                        topic = record.topic();
                        value = record.value();
                        log.info(String.format("topic:%s, partition:%d, offset:%d, key:%s, message:%s", topic, partition, offset, key, value));
                        consumer.process(record);
                        kafkaConsumer.commitAsync();
                    }
                } catch (MyException e) {
                    // 如果是自定义的错误,就扔到redis上,保留以后处理
                    if (value != null) {
                        log.warn("error message:{}", value);
                        hashOperations.put(RedisKey.KAFKA_ERROR_KEY, CorrectUtils.join(topic, key, partition, offset), value);
                        kafkaConsumer.commitAsync();
                    }
                } catch (Exception e) {
                    log.warn("process message failure!", e);
                    this.runnable = false;
                    kafkaConsumer.close();
                    THREAD_HOLDER_MAP.replace(this.id, new ThreadHolder(this.id, this.name, Thread.State.TERMINATED));
                    log.info("consumer task shutdown, id = " + id);
                }
            }
        }

下面开始对问题进行分析,根据经验,丢消息的原因一般是offset提交的不正确,所以从commitAsync()方法入手,根据源码进行分析

    /**
     * Commit offsets returned on the last {@link #poll(Duration)} for all the subscribed list of topics and partition.
     * Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)}
     */
    @Override
    public void commitAsync() {
        commitAsync(null);
    }

可以发现这个方法的含义是提交当前拉取的offset,但是由于是在ConsumerRecords循环中调用的所以会导致一个问题

一次拉取了十条消息,第一次commit就将最大的offset提交,这样在消费到第二条消息如果遇到报错关闭消费者后,下次重启也会从第十一条开始消费,所以丢掉了未消费的九条消息,该问题只会出现在一次拉取多条的情况下

然后开始思考解决办法,有如下几个方案

  1. 将commit放置到循环内,但是自行控制offset,每次+1,这样可以保证消息的不丢失不重复,但是会造成多次请求
  2. 将commit放置到循环外,同时在消费失败后,关闭消费者之前进行commit,offset为当前offset+已经消费成功的消息数量

相关文章

  • kafka消费者丢消息的原因分析

    场景: 通过kafka消费作业,存储到数据库中,当遇到错误信息时停止提交offset,关闭消费者,发送报警短信 项...

  • kafka的基本框架概览

    Producer:消息生产者,向kafka broker发消息的客户端 Consumer:消息消费者,向kafka...

  • Kafka 消费者拉取消息流程(限流)

    [TOC] Kafka 消费者拉取消息流程分析 我先给大家模拟一下消息拉取的实际现象,这里 max.poll.re...

  • Kafka-处理消息丢失

    Kafka存在丢消息的问题,消息丢失会发生在Broker,Producer和Consumer三种。 Broker丢...

  • Kafka实际案例问题

    kafka consumer防止数据丢失 Kafka学习之怎么保证不丢,不重复消费数据 1 消费者pull数据时,...

  • kafka低进阶

    1、kafka工作流程 Kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topi...

  • Kafka学习笔记(二)架构深入

    1. Kafka工作流程及文件存储机制 Kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,...

  • Kafka 消费者 Java 实现

    应用程序使用 KafkaConsumer向 Kafka 订阅 Topic 接收消息,首先理解 Kafka 中消费者...

  • kafka表引擎使用

    1 创建kafka topic 2 验证生产者消费者 消费结果 消费者能成功消费到消息,kafka在本机可用 3 ...

  • kafka消息可靠性方案

    在 Kafka 工作机制 一文提及了 Kafka 消息的不可靠性。本文就 Kafka 消息的三种不可靠性(重复、丢...

网友评论

      本文标题:kafka消费者丢消息的原因分析

      本文链接:https://www.haomeiwen.com/subject/zidlcctx.html