美文网首页
Flink从Kafka读数据

Flink从Kafka读数据

作者: yayooo | 来源:发表于2019-08-23 00:01 被阅读0次

package com.atguigu.apiTest

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

object SourceTestKafka {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //从kafka读取数据
    val properties = new Properties()
    properties.setProperty("bootstrap.servers","hadoop102:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")

    //创建一个source
    /**
      * (String topic, DeserializationSchema<T> valueDeserializer, Properties props)
      * (topic ,值的反序列化工具,Properties)
      */
    val stream3: DataStream[String] = env.addSource( new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(),properties))

    stream3.print("stream3: ").setParallelism(1)

    env.execute()
  }
}

启动kafka一个生产者

bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic sensor

输入 sensor_1, 1547718199, 35.80018327300259

思考:保证数据一致性
如果挂掉了怎么保证数据一致性,设置检查点,状态存盘
(场景:在处理一条数据时,又来了一条数据,已经读进来了,如果此时处理的数据出错,回滚到的却是后来的这条数据的偏移量,也就导致了数据的丢失)
偏移量:

  • spark的两种处理方式:
    1.等数据消费完再对偏移量修改
  1. 数据回滚后,手动提交修改偏移量
  • Flink中的处理方式:
    1.它是一条一条的读
    2.本身是有状态的
  1. FlinkKafkaConsumer实现了手动修改偏移量

相关文章

网友评论

      本文标题:Flink从Kafka读数据

      本文链接:https://www.haomeiwen.com/subject/oxhasctx.html