美文网首页
PHP 处理kafka消息实例

PHP 处理kafka消息实例

作者: SuperGu | 来源:发表于2018-06-05 17:09 被阅读159次

低级方式(Low level)

这种方式没有消费组的概念

<?php

$rk = new RdKafka\Consumer();

$rk->setLogLevel(LOG_DEBUG);

// 指定 broker 地址,多个地址用"," 分割

$rk->addBrokers("127.0.0.1:9092");

// var_dump($rk);die;

$topic = $rk->newTopic("test");

$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);

// var_dump($topic);die;

while (true) {

    // 第一个参数是分区号

    // 第二个参数是超时时间

    $msg = $topic->consume(0, 1000);

    if ($msg->err) {

        echo $msg->errstr(), "\n";

        break;

    } else {

        echo $msg->payload, "\n";

    }

}

高级方式 (High level)

这种方式可以指定消费组,一个消费组内,一个consumer 进程只能读取一个分区,

<?php

$conf = new RdKafka\Conf();

// Set a rebalance callback to log partition assignments (optional)

// 当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发

$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {

    switch ($err) {

        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:

            echo "Assign: ";

            var_dump($partitions);

            $kafka->assign($partitions);

            break;

        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:

            echo "Revoke: ";

            var_dump($partitions);

            $kafka->assign(NULL);

            break;

        default:

            throw new \Exception($err);

    }

});

// 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于 多于这个topic 分区的数量是没有意义的。

$conf->set('group.id', 'myConsumerGroup1');

//添加 kafka集群服务器地址

$conf->set('metadata.broker.list', '127.0.0.1:9092');

$topicConf = new RdKafka\TopicConf();

// Set where to start consuming messages when there is no initial offset in

// offset store or the desired offset is out of range.

// 'smallest': start from the beginning

//当没有初始偏移量时,从哪里开始读取

$topicConf->set('auto.offset.reset', 'smallest');

// Set the configuration to use for subscribed/assigned topics

$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);

// 让消费者订阅log 主题

$consumer->subscribe(['test']);

while (true) {

echo 1;

    $message = $consumer->consume(120*1000);

    switch ($message->err) {

        case RD_KAFKA_RESP_ERR_NO_ERROR:

            var_dump($message);

            break;

        case RD_KAFKA_RESP_ERR__PARTITION_EOF:

            echo "No more messages; will wait for more\n";

            break;

        case RD_KAFKA_RESP_ERR__TIMED_OUT:

            echo "Timed out\n";

            break;

        default:

            throw new \Exception($message->errstr(), $message->err);

            break;

    }

}

相关文章

  • PHP 处理kafka消息实例

    低级方式(Low level) 这种方式没有消费组的概念

  • [转]使用PHP处理Kafka消息

    Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高...

  • php操作kafka之实例操作(生产者和消费者)

    php操作kafka之实例操作(生产者和消费者) php kafka手册地址 1.生产者 更多例子 2.消费者 更...

  • Flink-Kafka End-to-End Exactly-O

    Flink 从kafka读消息处理后写入kafka样例(Consume-Processing-Produce) 测...

  • Kafka

    Kafka 一、什么是Kafka Kafka是一个分布式的基于发布/订阅模式的消息队列,强悍的消息处理能力,主要应...

  • Kafka 概述

    Kafka 能用来干嘛? 消息队列 实时数据处理, 流式处理(一般结合storm) 日志聚合等 Kafka 架构 ...

  • Kafka消息流处理

  • kafka学习笔记-kafka基础

    参考:极客时间-Kafka核心技术与实战 kafka术语 消息(Record):kafka要处理的主要对象。 主题...

  • Kafka集群部署指南

    一、前言 1、Kafka简介 Kafka是一个开源的分布式消息引擎/消息中间件,同时Kafka也是一个流处理平台。...

  • Kafka集群部署指南

    一、前言 1、Kafka简介 Kafka是一个开源的分布式消息引擎/消息中间件,同时Kafka也是一个流处理平台。...

网友评论

      本文标题:PHP 处理kafka消息实例

      本文链接:https://www.haomeiwen.com/subject/orlmsftx.html