private final Logger log;
// producer 的序列id , 用在事务中的
private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private static final String JMX_PREFIX = "kafka.producer";
// producer network线程的名称前缀
public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";
// producer 的实例ID
private final String clientId;
// Visible for testing
// producer 实际生产中的一些指标数据 , 不做分析
final Metrics metrics;
// partition 选择器
private final Partitioner partitioner;
// 一次请求最大的数据量
private final int maxRequestSize;
// 一个producer 实例中, BufferPool 的最大占用内存大小
private final long totalMemorySize;
// 生产者的元数据 , topic , partition 等
private final ProducerMetadata metadata;
// record 分配器 , 负责将 record 写到缓存 , sender 发送数据里,负责将数据从缓存里读取出来
private final RecordAccumulator accumulator;
// producer 持有的 sender , 异步发送缓冲区中的 records
private final Sender sender;
// deamon线程,执行sender
private final Thread ioThread;
// 数据的压缩类型
private final CompressionType compressionType;
private final Sensor errors;
private final Time time;
// key 的序列化器
private final Serializer<K> keySerializer;
// value 的序列化器
private final Serializer<V> valueSerializer;
// 生产者的配置 key , 及相关的默认配置
private final ProducerConfig producerConfig;
// 写缓存的最大等待时长
private final long maxBlockTimeMs;
// producer 关键节点的拦截器 , 可以修改相关参数和行为
private final ProducerInterceptors<K, V> interceptors;
// 当前API的版本 , 来决定 和 broker 的API 是否兼容
private final ApiVersions apiVersions;
// 事务管理器
private final TransactionManager transactionManager;
//构造方法
KafkaProducer(Map<String, Object> configs, //属性配置
Serializer<K> keySerializer, //key 序列化器
Serializer<V> valueSerializer, //value 序列化器
ProducerMetadata metadata, //生产者元数据
KafkaClient kafkaClient, //持有网络信息的 client 对象
ProducerInterceptors interceptors, //自定义的拦截器
Time time) { // 默认是系统当前时间
// 将 Map 的配置 , 转为 ProducerConfig 对象 .
ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer,
valueSerializer));
try {
// 取出设置了的 配置 , 即不使用默认配置的配置
Map<String, Object> userProvidedConfigs = config.originals();
this.producerConfig = config;
this.time = time;
// 如果配置了 transactionalId , 那么取 transactionalId . 否则就是 null
String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
(String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;
// 构造 clientid , 1: 取配置 , 2:取transactionalId , 3: 通过 PRODUCER_CLIENT_ID_SEQUENCE 设置
this.clientId = buildClientId(config.getString(ProducerConfig.CLIENT_ID_CONFIG), transactionalId);
LogContext logContext;
// 设置基本的 log日志格式
if (transactionalId == null)
logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
else
logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
log = logContext.logger(KafkaProducer.class);
log.trace("Starting the Kafka producer");
// 构造一个 clientid的map
Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
// 初始化 partitioner 的执行器 , 默认是 org.apache.kafka.clients.producer.internals.DefaultPartitioner
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
// producer send失败 , 到下一次retry 的等待时长
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
if (keySerializer == null) {
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.keySerializer.configure(config.originals(), true);
} else {
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
this.keySerializer = keySerializer;
}
if (valueSerializer == null) {
this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.valueSerializer.configure(config.originals(), false);
} else {
// 忽略默认的配置
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.valueSerializer = valueSerializer;
}
// load interceptors and make sure they get clientId
// 加载 拦截器 , 并确保 拦截器能获取 clientid 参数
userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
ProducerConfig configWithClientId = new ProducerConfig(userProvidedConfigs, false);
List<ProducerInterceptor<K, V>> interceptorList = (List) configWithClientId.getConfiguredInstances(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
if (interceptors != null)
this.interceptors = interceptors;
else
this.interceptors = new ProducerInterceptors<>(interceptorList);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer,
valueSerializer, interceptorList, reporters);
// 一次请求 , 最大的数据量 , 默认 1M
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
// bufferPool 的内存总量 , 默认 32M
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
// 压缩类型 , 默认 none , 不压缩
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
//写缓存 , 内存不足时的最大等待时长
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
// 事务管理器 , 不启用事务时 , 默认为 null
this.transactionManager = configureTransactionState(config, logContext, log);
// 生产者发送完请求接受服务器ACk的时间,该时间允许重试 ,该配置应该大于request.timeout.ms + linger.ms。 , 默认 2 分钟
int deliveryTimeoutMs = configureDeliveryTimeout(config, log);
this.apiVersions = new ApiVersions();
// 初始化 RecordAccumulator
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), // 取 batch.size 配置
this.compressionType, //压缩类型
lingerMs(config), // 取发送的等待时长
retryBackoffMs, //重试的等待时长
deliveryTimeoutMs, // 传输的等待时长
metrics,
PRODUCER_METRIC_GROUP_NAME,
time,
apiVersions,
transactionManager, //事务管理器
// 初始化缓存池
new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
// 取配置的网络相关配置
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));
// 初始化元数据
if (metadata != null) {
this.metadata = metadata;
} else {
this.metadata = new ProducerMetadata(retryBackoffMs,
config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
logContext,
clusterResourceListeners,
Time.SYSTEM);
this.metadata.bootstrap(addresses, time.milliseconds());
}
this.errors = this.metrics.sensor("errors");
// 初始化 sender 对象
this.sender = newSender(logContext, kafkaClient, this.metadata);
// sender 线程的名称
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
// 初始化 sender 线程
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
//启动 sender 线程
this.ioThread.start();
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
close(Duration.ofMillis(0), true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
}
如果文章有帮助到您,请点个赞,您的反馈会让我感到文章是有价值的
网友评论