美文网首页hive
spark读写mysql、hive、kafka数据demo

spark读写mysql、hive、kafka数据demo

作者: bigdata_er | 来源:发表于2018-11-06 15:52 被阅读29次

读取hive库数据

pom.xml依赖配置

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.1.1</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-hive_2.11</artifactId>
  <version>2.1.1</version>
</dependency>

读取hive数据demo

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object Main {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
              .setMaster("spark://master:7077")//申明spark运行模式
              .setAppName("risk")//设置job名称(可不写)
    val spark = SparkSession.builder()//spark-2.0采用SparkSession代替sparkContext
                .config(conf)
                .enableHiveSupport()//添加对HIVE的支持,否则无法访问hive库
                .getOrCreate()
    import spark.implicits._
    spark.sql("use bmkp")
    val df= spark.sql("select * from customer")//在hive中执行sql语句,返回DataSet格式数据
    df.show()
    spark.stop()
  }
}

读取mysql数据

pom.xml配置文件

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.1.1</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-hive_2.11</artifactId>
  <version>2.1.1</version>
</dependency>

读取mysql数据demo

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object Main {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
                  .setMaster("spark://master:7077")//申明spark运行模式
                  .setAppName("risk")//设置job名称(可不写)
    val spark = SparkSession.builder()//spark-2.0采用SparkSession代替sparkContext
                 .config(conf)
                 .enableHiveSupport()//添加对HIVE的支持,否则无法访问hive库
                 .getOrCreate()
//读取mysql中数据,返回数据类型为DataSet
val df = spark.read
        .format("jdbc")
        .options(Map("url" ->   
//配置mysql连接参数,包括mysql ip 端口  数据库名称 登录名和密码
"jdbc:mysql://***.***.***.***:3036/bmkpstress?user=root&password=**********",
//定义驱动程序
         "driver"->"com.mysql.jdbc.Driver",
//编写sql  在mysql中执行该sql并返回数据
         "dbtable" -> "(select * from test group by id) as aaa"))
          .load()
    spark.stop()
  }
}

SPARKSTREAMING读取kafka数据

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>
  <version>2.1.1</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
  <version>2.1.1</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.1.1</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-hive_2.11</artifactId>
  <version>2.1.1</version>
</dependency>

读取kafka数据demo

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

object Main {
  def main(args:Array[String]):Unit={
    val conf = new SparkConf().setMaster("spark://master:7077")
      .setAppName("kafka_hive");
    val spark = SparkSession.builder().master("spark://master:7077").config(conf).enableHiveSupport().getOrCreate()
    var ssc = new StreamingContext(conf, Seconds(10));
    var topics = Array("service_cksc","service_ckxc","service_dcyy");//kafka  topic名称
    var group = "bmkp" //定义groupID
    val kafkaParam = Map(   //申明kafka相关配置参数
      "bootstrap.servers" -> "***.104.42.127:19092,***.104.202.222:19092,***.135.73.152:19092", //kafka 集群IP及端口
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> group, //定义groupID
      "auto.offset.reset" -> "earliest",//设置丢数据模式  有 earliest,latest, none
      "enable.auto.commit" -> (false: java.lang.Boolean)//设置是否自动存储offset 这里设置为否
    );
    val offsetRanges = Array()
    var stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParam))//从kafka读取数据 获取数据流
    stream.foreachRDD { rdd =>
      import spark.implicits._
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //获取offset
      /*
      这里处理从kafka获取的数据,在确定获取的数据已经存储或者处理后将该RDD的offset存储
       */
      stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) //存储offset
    }
  }
}

SPARK写数据到HIVE

pom.xml配置信息

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.1.1</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-hive_2.11</artifactId>
  <version>2.1.1</version>
</dependency

写数据到hive库demo

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object Main {
  case class Person(name:String,col1:Int,col2:String)
  def main(args:Array[String]):Unit={
    val conf = new SparkConf()
      .setMaster("spark://master:7077")//申明spark运行模式
      .setAppName("kettle")//设置job名称(可不写)
    val spark = SparkSession.builder()//spark-2.0采用SparkSession代替sparkContext
      .config(conf)
      .enableHiveSupport()//添加对HIVE的支持,否则无法访问hive库
      .getOrCreate()
    import spark.implicits._ //引入隐式转换 否则RDD无法转换成DataSet(DataFrame)
    spark.sql("use DataBaseName"//在hive中执行sql语句
    val data = spark.read.textFile("path")//读取hdfs中的文件,返回的是RDD格式数据,RDD格式数据不能直接写入hive,(这里代表任意的RDD类型数据)
      .map(x=>x.split(","))
      .map(x=>Person(x(0),x(1).toInt,x(2)))//利用用例类将RDD格式居转换成DataSetG格式数据,从而可以写入hive中
    data.toDF().createOrReplaceTempView("table1") //将DataSet格式数据映射到临时表中
    spark.sql("insert into table2 partition(date='2015-04-02') select name,col1,col2 from table1")//在hive上运行sql语句将临时表中数据抽出并存入hive中
    spark.close()
  }
}

写数据到mysql

pom.xml配置

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.1.1</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>2.1.1</version>
</dependency>

spark 写数据到mysql库demo 1

  import java.sql.{Connection, DriverManager, PreparedStatement}
  import org.apache.spark.SparkConf
  import org.apache.spark.sql.SparkSession
object Main {
  case class Blog(name: String, count: Int)
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setMaster("spark://master:7077")//申明spark运行模式
      .setAppName("kettle")//设置job名称(可不写)
    val spark = SparkSession.builder()//spark-2.0采用SparkSession代替sparkContext
      .config(conf)
      .enableHiveSupport()//添加对HIVE的支持,否则无法访问hive库
      .getOrCreate()
    //获取RDD数据  这里只是做一个实例 代表spark处理产生的所有RDD类型的数据
    val data = spark.sparkContext.parallelize(List(("www", 10), ("iteblog", 20), ("com", 30)))
    var conn: Connection = null//定义mysql连接
    var ps: PreparedStatement = null
    val sql = "insert into blog(name, count) values (?, ?)"//需要执行的sql语句,两个 “?”代表后面需要替换的数据
    data.foreachPartition(rdd=>
      try {
        //具体定义mysql的驱动管理器,主要设置mysql地址   端口  数据库  用户名  密码
        conn = DriverManager.getConnection("jdbc:mysql://***.***.***.***:3306/test","root", "******")
        rdd.toIterator.foreach(data => {
          ps = conn.prepareStatement(sql)
          ps.setString(1, data._1)//将需要写入mysql的数据进行映射
          ps.setInt(2, data._2)
          ps.executeUpdate()//在mysql上执行sql语句将数据插入到相应的表中
        })
      } catch {
        case e: Exception => println("Mysql Exception")
      } finally {
        if (ps != null) {
          ps.close()
        }
        if (conn != null) {
          conn.close()//关闭mysql连接
        }
      })
  }
}

写数据到mysql库demo 2

  import org.apache.spark.SparkConf
  import org.apache.spark.sql.{SaveMode, SparkSession}

object Main {
  case class Blog(name: String, count: Int)
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setMaster("spark://master:7077") //申明spark运行模式
      .setAppName("kettle")
    //设置job名称(可不写)
    val spark = SparkSession.builder() //spark-2.0采用SparkSession代替sparkContext
      .config(conf)
      .enableHiveSupport() //添加对HIVE的支持,否则无法访问hive库
      .getOrCreate()
    //获取RDD数据  这里只是做一个实例 代表spark处理产生的所有RDD类型的数据
    val data = spark.sparkContext.parallelize(List(("www", 10), ("iteblog", 20), ("com", 30)))
    import spark.implicits._
    val df = data.map(x=>new Blog(x._1,x._2)).toDF()//将RDD类型数据转换成DataSet类型
    df.write.mode(SaveMode.Append).format("jdbc")
      .option("url", "jdbc:mysql://***.***.***.***:3306/test")//定义mysql 地址 端口 数据库
      .option("dbtable", "blog")//定义需要插入的mysql目标表
      .option("user", "****")//定义登录用户名
      .option("password", "************")//定义登录密码
      .save()//保存数据
  }
}

相关文章

网友评论

    本文标题:spark读写mysql、hive、kafka数据demo

    本文链接:https://www.haomeiwen.com/subject/dtuuxqtx.html