美文网首页
Kafka实战-SpringBoot2生产消息

Kafka实战-SpringBoot2生产消息

作者: 大雄喵 | 来源:发表于2021-01-25 22:22 被阅读0次

1、基本环境
JDK:1.8
SpringBoot版本:2.3.8.RELEASE
Kafka版本:2.7单机部署

2、创建项目
在IDEA中新建SpringBoot项目


image.png

选择需要引入的依赖


image.png

最终pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.8.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.study</groupId>
    <artifactId>kafka-study</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-study</name>
    <description>学习Kafka</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

3、Kafka配置类
首先在application.properties中新建一个配置:

kafka.broker.list=192.168.30.128:9092

然后新建一个KafkaConfig的配置

package com.study.kafka.config;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Properties;

/**
 * Kafka配置
 */
@Configuration
public class KafkaConfig {

    @Value("${kafka.broker.list}")
    public String brokerList;

    public static final String TOPIC = "syslogs";

    public Properties producerConfigs() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 20000000);//20M 消息缓存
        //生产者空间不足时,send()被阻塞的时间,默认60s
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
        //生产者重试次数
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        //指定ProducerBatch(消息累加器中BufferPool中的)可复用大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //生产者会在ProducerBatch被填满或者等待超过LINGER_MS_CONFIG时发送
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
        return props;
    }

    @Bean
    public Producer<Integer, Object> getKafkaProducer() {
        //KafkaProducer是线程安全的,可以在多个线程中共享单个实例
        return new KafkaProducer<Integer, Object>(producerConfigs());
    }

}

4、使用测试类发送消息

package com.study.kafka;

import com.study.kafka.config.KafkaConfig;
import com.study.kafka.domain.LogInfo;
import kafka.utils.Json;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.Date;

@Slf4j
@SpringBootTest
class KafkaStudyApplicationTests {

    @Autowired
    Producer producer;

    @Test
    void sendKafkaMsg() {
        LogInfo logInfo = new LogInfo();
        logInfo.setId(10000L);
        logInfo.setIp("11.22.33.44");
        logInfo.setDeviceType("Huawei Mate40");
        logInfo.setOsVersion("11.0");
        logInfo.setCreateDate(new Date());

        // json消息
        String msg = Json.encodeAsString(logInfo);

        ProducerRecord<String,String> record = new ProducerRecord<>(KafkaConfig.TOPIC,msg);
        try {
            // 异步发送消息
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e == null) {
                        log.info("发送成功,分区:{},偏移量:{}",metadata.partition(),metadata.offset());
                    }else {
                        log.info("异常:{}",e.getMessage());
                    }
                }
            });
        }catch (Exception ex) {
            ex.printStackTrace();
        }
    }

}

5、使用kafka-console-consumer.sh消费消息
在控制台执行命令,监听消息:

[root@master kafka_2.12-2.7.0]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.30.128:9092 --topic syslogs --from-beginning

输出如下:

{"id":10000,"ip":"11.22.33.44","deviceType":"Huawei Mate40","osVersion":"11.0","createDate":1611583407099}
{"id":10000,"ip":"11.22.33.44","deviceType":"Huawei Mate40","osVersion":"11.0","createDate":1611583457711}
{"id":10000,"ip":"11.22.33.44","deviceType":"Huawei Mate40","osVersion":"11.0","createDate":1611583493982}

作者:大雄喵

相关文章

  • Kafka实战-SpringBoot2生产消息

    1、基本环境JDK:1.8SpringBoot版本:2.3.8.RELEASEKafka版本:2.7单机部署 2、...

  • kafka生产环境规划-kafka商业环境实战

    kafka线上真实环境实战及调优进阶系列 kafka 商业环境实战-kafka生产环境规划 kafka 商业环境实...

  • Kafka常用命令

    启动Kafka并生产消费消息 启动ZooKeeper 启动Kafka 查看启动后kafka的版本 生产者发送消息 ...

  • Kafka 生产者概述

    生产者:往消息队列里推送消息的应用 发送消息的过程 Kafka 生产者发送消息的过程: Kafka 会将发送消息包...

  • kafka生产消费

    Kafka消息生产及消费大体流程 发送流程 1、名词含义: 1)Producer :消息生产者,就是向kafka ...

  • kafka生产消息

    问题:1、如何保证消息发送的高可用2、副本直接如何同步

  • kafka低进阶

    1、kafka工作流程 Kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topi...

  • Kafka学习笔记(二)架构深入

    1. Kafka工作流程及文件存储机制 Kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,...

  • kafka常用命令

    (1)启动生产消息端,生产消息,./bin/kafka-console-producer.sh --broker-...

  • 看完这个,你觉得你真的了解 Kafka 消费者吗?

    之前我们介绍过了 Kafka 整体架构,Kafka 生产者,Kafka 生产的消息最终流向哪里呢?当然是需要消费了...

网友评论

      本文标题:Kafka实战-SpringBoot2生产消息

      本文链接:https://www.haomeiwen.com/subject/zixwzktx.html