kafka 重置offset

作者: 無敵兔八哥 | 来源:发表于2018-01-12 16:12 被阅读140次
#!/usr/bin/env python
# coding:utf-8

import sys
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata

if __name__ == '__main__':

    if (len(sys.argv) < 6):
        print("usage <kafkaHost> <kafkaPort> <groupid> <topic> <partition> <offset>")
        sys.exit(0)

    kafkaHost = sys.argv[1]
    kafkaPort = sys.argv[2]
    groupid = sys.argv[3]
    topic = sys.argv[4]
    partition = int(sys.argv[5])
    offset = int(sys.argv[6])

    # init kafka consumer
    consumer = KafkaConsumer(group_id=groupid,
                             bootstrap_servers='{kafka_host}:{kafka_port}'.format(
                                 kafka_host=kafkaHost, kafka_port=kafkaPort))

    # 分配topic and partition
    consumer.assign([TopicPartition(topic, partition)])

    offsets = {}
    meta = consumer.partitions_for_topic(topic)
    offsets[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, meta)
    consumer.seek(TopicPartition(topic, partition), offset)

    consumer.commit(offsets)

相关文章

网友评论

    本文标题:kafka 重置offset

    本文链接:https://www.haomeiwen.com/subject/wfmfoxtx.html