美文网首页
game 数据分析

game 数据分析

作者: 飞起的书包 | 来源:发表于2019-02-22 13:41 被阅读3次

package kafka_producer;

import org.apache.kafka.clients.producer.Callback;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class GameDataProducer {

    public static void main(String[] args) {

        Properties props = new Properties();

        //kafka 集群配置

        props.put("bootstrap.servers", "tstkj001:6667,tstkj002:6667,tstkj003:6667");

        props.put("acks", "1");

        props.put("retries", 3);

        props.put("batch.size", 16384);

        props.put("buffer.memory", 33554432);

        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        //用户 topic 和数据 自己模拟

        ProducerRecord<String, String> msg1 = new ProducerRecord<>("user", "");

        //游戏 topic 和数据 自己模拟

        ProducerRecord<String, String> msg2 = new ProducerRecord<>("game", "");

        send(producer,msg1);

        send(producer,msg2);

    }

    public static void send(KafkaProducer producer,  ProducerRecord<String, String> msg){

        producer.send(msg, new Callback() {

            @Override

            public void onCompletion(RecordMetadata recordMetadata, Exception e) {

                if(e !=null){

                    e.printStackTrace();

                }

            }

        });

    }

}

___________________________________________________________________________________________________

import org.apache.spark.SparkConf

import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

import org.apache.spark.streaming.{Seconds, StreamingContext}

object DirectKafkaWordCount {

  def main(args: Array[String]) {

    if (args.length < 2) {

      System.err.println(

        s"""

          |Usage: DirectKafkaWordCount <brokers> <topics>

          |  <brokers> is a list of one or more Kafka brokers

          |  <topics> is a list of one or more kafka topics to consume from

          |

        """.stripMargin)

      System.exit(1)

    }

    //    StreamingExamples.setStreamingLogLevels()

    val Array(brokers, topics) = args

    // Create context with 2 second batch interval

    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")

    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create direct kafka stream with brokers and topics

    val topicsSet = topics.split(",").toSet

    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)

    val messages = KafkaUtils.createDirectStream[String, String](

      ssc,

      LocationStrategies.PreferConsistent,

      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

    // Get the lines, split them into words, count the words and print

    val lines = messages.map(_.value)

    val words = lines.flatMap(_.split(" "))

    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)

    wordCounts.print()

    // Start the computation

    ssc.start()

    ssc.awaitTermination()

  }

}

_____________________________________________________________________________________________________

/**

  * 测试类

  */

object WordCountTest {

  def main(args: Array[String]): Unit = {

    val params = new Array[String](2)

    //对应的是DirectKafkaWordCount中args[0] 的参数 kafka集群

      params(0) = "tstkj001:6667,tstkj002:6667,tstkj003:6667"

    //对应的是DirectKafkaWordCount中args[1] 的参数 topic

      params(1) = "user,game"

    DirectKafkaWordCount.main(params)

  }

}

___________________________________________________________________________________________________

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->

<dependencies>

    <dependency>

        <groupId>org.apache.spark</groupId>

        <artifactId>spark-streaming_2.11</artifactId>

        <version>2.2.0</version>

        <!--<scope>provided</scope>-->

    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->

    <dependency>

        <groupId>org.apache.spark</groupId>

        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>

        <version>2.2.0</version>

        <!--<scope>provided</scope>-->

    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->

    <dependency>

        <groupId>org.apache.spark</groupId>

        <artifactId>spark-core_2.11</artifactId>

        <version>2.2.0</version>

        <!--<scope>provided</scope>-->

    </dependency>

--

相关文章

网友评论

      本文标题:game 数据分析

      本文链接:https://www.haomeiwen.com/subject/mnfeyqtx.html