美文网首页
kafka05 开发自定义分区器

kafka05 开发自定义分区器

作者: 6c0fe9142f09 | 来源:发表于2018-09-11 17:41 被阅读5次

开发自定义分区器

上一节我们看到,如果在发送消息的时候没有指定对应的分区,会使用默认分区器对消息进行分区,这一节我们试着写一个自己的分区器

默认分区器源码阅读
  • 进入默认分区器
org.apache.kafka.clients.producer.internals.DefaultPartitioner
**********
获取当前partition的数量
如果key==null:随机在partition中进行分配
如果key!=null:会对key进行hash取值,使用hash对partition数量进行取模
+ 所以key相同的消息,会发送到相同的分区
**********
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
自定义分区器
  • implements Partitioner
public class MyPartition implements Partitioner {
  • 重写partition、close、configure方法
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        System.out.println("Customed Partitioner is running...");
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if (keyBytes == null){
            throw new InvalidRecordException("key cannot be null..");
        }else {
            if (((String)key).equals("1")){
                return 1;
            }else {
                return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions));
            }
        }
    }

    public void close() {

    }

    public void configure(Map<String, ?> configs) {

    }
  • 使用
kafkaProps.put("partitioner.class","com.shiyanlou.MyPartition");
  • demo
package com.shiyanlou;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;

import javax.rmi.CORBA.Util;
import java.util.List;
import java.util.Map;

public class MyPartition implements Partitioner {


    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        System.out.println("Customed Partitioner is running...");
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if (keyBytes == null){
            throw new InvalidRecordException("key cannot be null..");
        }else {
            if (((String)key).equals("1")){
                return 1;
            }else {
                return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions));
            }
        }
    }

    public void close() {

    }

    public void configure(Map<String, ?> configs) {

    }
}
package com.shiyanlou.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.Future;

public class MySecondProducer {
    public static void main(String[] args) {
        Properties kafkaProps = new Properties();
        kafkaProps.put("bootstrap.servers", "132.232.14.247:9094");
        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("partitioner.class","com.shiyanlou.MyPartition");

        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(kafkaProps);
        ProducerRecord<String,String> record = new ProducerRecord<String, String>("mySecondTopic","1","hello kafka");

        long startTime = System.currentTimeMillis();
        for (int i=0;i<10;i++) {
            try {
                //发送前面创建的消息对象ProducerRecord到kafka集群
                //发送消息过程中可能发送错误,如无法连接kafka集群,所以在这里使用捕获异常代码
                Future<RecordMetadata> future = producer.send(record);
                //producer的send方法返回Future对象,我们使用Future对象的get方法来实现同步发送消息。
                //Future对象的get方法会产生阻塞,直到获取kafka集群的响应,响应结果分两种:
                //1、响应中有异常:此时get方法会抛出异常,我们可以捕获此异常进行相应的业务处理
                //2、响应中无异常:此时get方法会返回RecordMetadata对象,此对象包含了当前发送成功的消息在Topic中的offset、partition等信息
                RecordMetadata recordMetadata = future.get();
                long offset=recordMetadata.offset();
                int partition=recordMetadata.partition();
                System.out.println("the message  offset : "+offset+" ,partition:"+partition+"。");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        long endTime = System.currentTimeMillis();
        System.out.println(endTime-startTime);

        producer.close();
    }
}

相关文章

  • kafka05 开发自定义分区器

    开发自定义分区器 上一节我们看到,如果在发送消息的时候没有指定对应的分区,会使用默认分区器对消息进行分区,这一节我...

  • Spark-RDD分区器

    Spark中现在支持的分区器有Hash分区器和Range分区器,除此之外,用户也可以自定义分区方式。默认的分区方式...

  • 键值对RDD数据分区

    前言 Spark目前支持Hash分区、Range分区和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定...

  • Flink(1.13) 中的分区器

    前言 flink中有七大官方定义的分区器以及一个用于自定义的分区器(共八个)。 org.apache.flink....

  • 用流收集数据

    归约和总结 分组 分区 收集器接口 可以不用实现Collector进行自定义收集

  • MapReduce-API(2)找出每月气温最高的2天

    思考 自定义类型分区排序比较器 数据案例 top-K:找出每月气温最高的2天 自定义类型 Map阶段 分组 对ke...

  • SparkCore之键值对RDD数据分区器

    Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分...

  • 2020-12.1-Spark-9(Spark-Core)

    1.RDD血缘关系 2.持久化:cache persist checkpoint(检查点) 3.自定义分区器(...

  • Centos安装与配置

    桥接模式:虚拟机-->路由器-->互联网NAT模式:虚拟机-->宿主机-->路由器-->互联网 自定义分区: (1...

  • PV,VG及LV的使用

    经常搞后台开发的人员,总是会涉及到系统磁盘的管理,例如创建裸设备,自定义磁盘分区,使用了一阵子后为某个分区扩展逻辑...

网友评论

      本文标题:kafka05 开发自定义分区器

      本文链接:https://www.haomeiwen.com/subject/pyiugftx.html