美文网首页
Spark Streaming + Kafka

Spark Streaming + Kafka

作者: 歌哥居士 | 来源:发表于2019-03-29 16:09 被阅读0次

Kafka Receiver

<!-- Spark Streaming整合Kafka -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>${spark.version}</version>
</dependency>

本地测试

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Spark Streamin对接Kafka的方式一
  */
object KafkaReceiverWordCount {
  def main(args: Array[String]): Unit = {

    // 检查参数以及初始化
    if (args.length != 4) {
      System.err.println("Usage: KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")
    }
    val Array(zkQuorum, group, topics, numThreads) = args
    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("KafkaReceiverWordCount")
      .set("spark.driver.host", "localhost")
    val ssc = new StreamingContext(conf, Seconds(5))


    // 关键代码
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
    messages.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).print()

    // ~
    ssc.start()
    ssc.awaitTermination()
  }
}

运行
先启动zookeeper和kafka
运行时加入参数: host000:2181 group_test hello_test 1

服务端测试

修改代码
//      .setMaster("local[2]")
//      .setAppName("KafkaReceiverWordCount")
//      .set("spark.driver.host", "localhost")
$ mvn clean package -DskipTests
$ scp spark-learning-1.0-SNAPSHOT.jar user000@host000:~/jars
$ spark-submit  --master local[2] \
--class KafkaReceiverWordCount \
--packages  org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 \
~/jars/spark-learning-1.0-SNAPSHOT.jar host000:2181 group_test hello_test 1

Kafka Direct

我的Kafka依赖使用的是0.9.0.0,会报错:kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker。所以将Kafka版本改成0.8.2.1。
<!-- Spark Streaming整合Kafka -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>${spark.version}</version>
</dependency>

本地测试

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Spark Streamin对接Kafka的方式二
  */
object KafkaDirectWordCount {
  def main(args: Array[String]): Unit = {

    // 检查参数以及初始化
    if (args.length != 2) {
      System.err.println("Usage: KafkaReceiverWordCount <brokers> <topics>")
      System.exit(1)
    }
    val Array(brokers,topics) = args
    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("KafkaDirectWordCount")
      .set("spark.driver.host", "localhost")
    val ssc = new StreamingContext(conf, Seconds(5))

    // 关键代码
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val topicsSet = topics.split(",").toSet
    val messages = KafkaUtils.createDirectStream
      [String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

    messages.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).print()

    // ~
    ssc.start()
    ssc.awaitTermination()
  }
}

先启动zookeeper和kafka
运行时加入参数: host000:9092 hello_test

服务端测试

修改代码
//      .setMaster("local[2]")
//      .setAppName("KafkaDirectWordCount")
//      .set("spark.driver.host", "localhost")
$ mvn clean package -DskipTests
$ scp spark-learning-1.0-SNAPSHOT.jar user000@host000:~/jars
$ spark-submit  --master local[2] \
--class KafkaDirectWordCount \
--packages  org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 \
~/jars/spark-learning-1.0-SNAPSHOT.jar host000:9092 hello_test

相关文章

网友评论

      本文标题:Spark Streaming + Kafka

      本文链接:https://www.haomeiwen.com/subject/xpicvqtx.html