美文网首页
kafka服务端Offset保存时间过短导致consumer重启

kafka服务端Offset保存时间过短导致consumer重启

作者: 哈密朵 | 来源:发表于2019-07-31 16:43 被阅读0次

consumer初始化时会从broker取commit offset作为初始fetch offset来取消息,之后会继续在fetch offset上按顺序正确的往后取消息。所以正常的运行只依靠fetch offset就足够了,而且fetch offset在初始化之后就不需要用户理会,由consumer自行管理维护。 consumer的commit作为松散的支线可以在任意时间点执行,commit的意义在于尽可能及时的把消费处理的结果刷回broker去,以备consumer重启初始化或通过adminClient读取使用,所以习惯上成功消费一条就commit一次。一般来说commit offset会落后fetch offset一些,另外即使一次commit失败了也没关系,只要后序commit成功就能掩盖。

consumer的commit offset保存在broker集群的有50个partition的内部topic里,保存时间 offsets.retention.minutes (1440 minutes = 1 day) 的意思是说超过这个时间没有再次commit就删除该consumer的commit offfset,意义在于删除长期离线的consumer的commit信息。如果consumer从broker取commit时,还没有提交过offset或是已经被删除,就返回0。

broker里的topic消息log只会保留 log.retention.hours (168 hours = 7 days) ,时间点以前的消息就会被截断删除。而consumer的配置 auto.offset.reset(earliest | latest) 当consumer提供的fetch offset超出broker留存的消息log范围时,把fetch offset重置到broker留存消息的最小位或最大位。

场景描述:
因为commit offset的保存时间offsets.retention.minutes只有1天,而消息log的保存时间log.retention.hours有7天。
如果consumer是手动commit,当长时间没有新消息可以消费,也就长时间没有commit,造成commit offset被broker删除。之后一旦consumer重启,初始化时发现commit offset已经被删除,取到了0去fetch,必定会超出broker的留存消息范围,触发consumer的reset。如果reset=earliest 就会从留存的7天内的最小位消息开始消费,造成大量的重复消费。如果reset=latest 就会从最新消息开始消费,造成会丢失重启期间的消息。

结论:
不能在还有数据的时候,失去对数据消费的commit信息。不然会在consumer重启时把consumer reset到broker现有数据的最小位或最大位开始消费,造成数据重复或丢失。
反过来如果有commit信息,但数据已经被删除了,有两种情况
(1)说明consumer长期离线,这样的数据消费缺失是合理的,根据reset配置从当前现有数据的最小位或最大位开始消费。
(2)数据全部被消费过, 只是正常的过期删除,所以这并没有任何问题,也不会发生reset。
正常情况下,commit offset保存时间可以配置成消息log保存时间的2倍,如果log.retention.hours 仍然为 7 days,那 offsets.retention.minutes 可以配置成 14 days。习惯上把消息log配置保存 3 days,offsets配置保存 6 days。

相关文章

网友评论

      本文标题:kafka服务端Offset保存时间过短导致consumer重启

      本文链接:https://www.haomeiwen.com/subject/qnptdctx.html