美文网首页图解Kafka
KafkaProducer(2) 关键属性

KafkaProducer(2) 关键属性

作者: _孙行者_ | 来源:发表于2020-09-09 09:49 被阅读0次
    private final Logger log;
    // producer 的序列id , 用在事务中的
    private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    private static final String JMX_PREFIX = "kafka.producer";
    // producer network线程的名称前缀
    public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
    public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";

    // producer 的实例ID
    private final String clientId;
    // Visible for testing
    // producer 实际生产中的一些指标数据 , 不做分析
    final Metrics metrics;
    // partition 选择器
    private final Partitioner partitioner;
    // 一次请求最大的数据量
    private final int maxRequestSize;
    // 一个producer 实例中, BufferPool 的最大占用内存大小
    private final long totalMemorySize;
    // 生产者的元数据 , topic , partition 等
    private final ProducerMetadata metadata;
    // record 分配器 , 负责将 record 写到缓存 , sender 发送数据里,负责将数据从缓存里读取出来
    private final RecordAccumulator accumulator;
    // producer 持有的 sender  , 异步发送缓冲区中的 records
    private final Sender sender;
    // deamon线程,执行sender 
    private final Thread ioThread;
    // 数据的压缩类型
    private final CompressionType compressionType;
    private final Sensor errors;
    private final Time time;
    // key 的序列化器
    private final Serializer<K> keySerializer;
    // value 的序列化器
    private final Serializer<V> valueSerializer;
    // 生产者的配置 key , 及相关的默认配置
    private final ProducerConfig producerConfig;
    // 写缓存的最大等待时长
    private final long maxBlockTimeMs;
    // producer 关键节点的拦截器 , 可以修改相关参数和行为
    private final ProducerInterceptors<K, V> interceptors;
    // 当前API的版本 , 来决定 和 broker 的API 是否兼容
    private final ApiVersions apiVersions;
    // 事务管理器
    private final TransactionManager transactionManager;


    //构造方法 
KafkaProducer(Map<String, Object> configs, //属性配置
                  Serializer<K> keySerializer,    //key 序列化器
                  Serializer<V> valueSerializer,  //value 序列化器
                  ProducerMetadata metadata,  //生产者元数据
                  KafkaClient kafkaClient,    //持有网络信息的 client 对象
                  ProducerInterceptors interceptors,  //自定义的拦截器
                  Time time) {  // 默认是系统当前时间
        // 将 Map 的配置 , 转为 ProducerConfig 对象 . 
        ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer,
                valueSerializer));
        try {
            // 取出设置了的 配置 , 即不使用默认配置的配置
            Map<String, Object> userProvidedConfigs = config.originals();
            this.producerConfig = config;
            this.time = time;
            // 如果配置了  transactionalId , 那么取 transactionalId . 否则就是 null 
            String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
                    (String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;
            // 构造 clientid , 1: 取配置 , 2:取transactionalId , 3: 通过 PRODUCER_CLIENT_ID_SEQUENCE 设置
            this.clientId = buildClientId(config.getString(ProducerConfig.CLIENT_ID_CONFIG), transactionalId);

            LogContext logContext;
            // 设置基本的 log日志格式
            if (transactionalId == null)
                logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
            else
                logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
            log = logContext.logger(KafkaProducer.class);
            log.trace("Starting the Kafka producer");
            // 构造一个 clientid的map
            Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                    .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                    .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                    .tags(metricTags);
            List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                    MetricsReporter.class,
                    Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
            reporters.add(new JmxReporter(JMX_PREFIX));
            this.metrics = new Metrics(metricConfig, reporters, time);
            // 初始化 partitioner 的执行器 , 默认是 org.apache.kafka.clients.producer.internals.DefaultPartitioner
            this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
            // producer send失败 , 到下一次retry 的等待时长
            long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
            if (keySerializer == null) {
                this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                                                                         Serializer.class);
                this.keySerializer.configure(config.originals(), true);
            } else {
                config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
                this.keySerializer = keySerializer;
            }
            if (valueSerializer == null) {
                this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                                                                                           Serializer.class);
                this.valueSerializer.configure(config.originals(), false);
            } else {
                // 忽略默认的配置
                config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                this.valueSerializer = valueSerializer;
            }

            // load interceptors and make sure they get clientId
            // 加载 拦截器 , 并确保 拦截器能获取 clientid 参数
            userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
            ProducerConfig configWithClientId = new ProducerConfig(userProvidedConfigs, false);
            List<ProducerInterceptor<K, V>> interceptorList = (List) configWithClientId.getConfiguredInstances(
                    ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
            if (interceptors != null)
                this.interceptors = interceptors;
            else
                this.interceptors = new ProducerInterceptors<>(interceptorList);
            ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer,
                    valueSerializer, interceptorList, reporters);
            // 一次请求 , 最大的数据量 , 默认 1M
            this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
            // bufferPool 的内存总量 ,  默认 32M
            this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
            // 压缩类型 , 默认 none , 不压缩
            this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
            //写缓存 , 内存不足时的最大等待时长
            this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
            // 事务管理器 , 不启用事务时 , 默认为 null
            this.transactionManager = configureTransactionState(config, logContext, log);
            // 生产者发送完请求接受服务器ACk的时间,该时间允许重试 ,该配置应该大于request.timeout.ms + linger.ms。 , 默认 2 分钟
            int deliveryTimeoutMs = configureDeliveryTimeout(config, log);

            this.apiVersions = new ApiVersions();
            // 初始化 RecordAccumulator
            this.accumulator = new RecordAccumulator(logContext,
                    config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), // 取 batch.size 配置
                    this.compressionType,  //压缩类型
                    lingerMs(config),  // 取发送的等待时长
                    retryBackoffMs,  //重试的等待时长
                    deliveryTimeoutMs,  // 传输的等待时长
                    metrics,
                    PRODUCER_METRIC_GROUP_NAME,
                    time,
                    apiVersions,
                    transactionManager,  //事务管理器
                    // 初始化缓存池 
                    new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
            // 取配置的网络相关配置
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
                    config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
                    config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));
            // 初始化元数据
            if (metadata != null) {
                this.metadata = metadata;
            } else {
                this.metadata = new ProducerMetadata(retryBackoffMs,
                        config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
                        logContext,
                        clusterResourceListeners,
                        Time.SYSTEM);
                this.metadata.bootstrap(addresses, time.milliseconds());
            }
            this.errors = this.metrics.sensor("errors");
             // 初始化 sender 对象
            this.sender = newSender(logContext, kafkaClient, this.metadata);
            // sender 线程的名称
            String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
            // 初始化 sender 线程
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            //启动 sender 线程
            this.ioThread.start();
            config.logUnused();
            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
            log.debug("Kafka producer started");
        } catch (Throwable t) {
            // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
            close(Duration.ofMillis(0), true);
            // now propagate the exception
            throw new KafkaException("Failed to construct kafka producer", t);
        }
    }

如果文章有帮助到您,请点个赞,您的反馈会让我感到文章是有价值的

相关文章

  • KafkaProducer(2) 关键属性

    如果文章有帮助到您,请点个赞,您的反馈会让我感到文章是有价值的

  • 8. kafka分区

    分区策略 构造KafkaProducer代码如下: 属性partitioner.class就是决定消息如何分区的,...

  • Kafka系列之(5)——Kafka Producer源码解析

    KafkaProducer源码解析 KafkaProducer使用示例 (1)、KafkaProducer的sen...

  • 消息队列之Kafka-生产者

    1、发送模式 KafkaProducer 是线程安全的,可以在多个线程中共享单个 KafkaProducer 实例...

  • KafkaProducer

    Kafka源码阅读(一):Kafka Producer整体架构概述及源码分析 zqhxuyuan Kafka源码分...

  • 【Kafka零基础学习】Kafka Producer整体流程

    整体流程图 KafkaProducer类的实例对象可被多个线程使用。 KafkaProducer实例对象追加mes...

  • Day08 Java面向对象——多态

    关键字super 1)super,相较于关键字this,可以修饰属性、方法、构造器2)super修饰属性、方法:在...

  • kafka顺序消息踩坑记录

    Kafka顺序消息消息 1.消息发送的api 2.KafkaProducer.class 封装获取partiti...

  • Java - Day6

    static关键字 1, 使用static声明属性 static声明全局属性 2, 使用static声明方法 ...

  • 关键属性

    最近接触到一个重要概念:关键属性。 每件事物包括每个人都有很多属性,我们对之判断时,通常会拿属性进行比较。比较的属...

网友评论

    本文标题:KafkaProducer(2) 关键属性

    本文链接:https://www.haomeiwen.com/subject/knmjektx.html