美文网首页
Spark编程基础(Scala版)——Spark SQL

Spark编程基础(Scala版)——Spark SQL

作者: kaiker | 来源:发表于2021-08-09 19:16 被阅读0次

1、Spark SQL简介

Hive执行SQL过程
  • Spark线程级并行,MR是进程级并行
  • Spark SQL仅依赖HiveQL解析和Hive元数据,解析成抽象语法树后就交给Spark SQL


    Spark SQL执行过程

2、DataFrame

RDD是分布式的Java对象集合,但是对象内部结构对于RDD是不可知的

DataFrame是一种以RDD为基础的分布式数据集,提供了详细的结构信息

RDD与DataFrame

3、DataFrame创建

SparkSession支持从不同的数据源加载数据,以及把数据转换成DataFrame

    scala> import org.apache.spark.sql.SparkSession
    scala> val spark=SparkSession.builder().getOrCreate()

    scala> val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
    scala> peopleDF.select("name", "age").write.format("csv").save("file:///usr/local/spark/mycode/sql/newpeople.csv")

4、DataFrame常用操作

printSchema()

df.printSchema() 展示结构

select()

df.select(df("name").as("xxxname"), df("age")).show()

filter()

df.filter(df("age") > 20).show()

groupBy()

df.groupBy("age").count().show()

sort()

df.sort(df("age").desc).show()

5、从RDD转换为DataFrame

5.1 利用反射推断

  • map(attributes => Person(attributes(0), attributes(1).trim.toInt))
scala> import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder 
scala> import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoder 
scala> import spark.implicits._  //导入包,支持把一个RDD隐式转换为一个DataFrame
import spark.implicits._
scala> case class Person(name: String, age: Long)  //定义一个case class
defined class Person
scala> val peopleDF = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(_.split(",")).
map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint] 
scala> peopleDF.createOrReplaceTempView("people") //必须注册为临时表才能供下面的查询使用
scala> val personsRDD = spark.sql("select name,age from people where age > 20")
//最终生成一个DataFrame,下面是系统执行返回的信息
personsRDD: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
scala> personsRDD.map(t => "Name: "+t(0)+ ","+"Age: "+t(1)).show()

5.2 编程定义

  • 制作表头
  • 制作表中记录
  • 把表头和内容拼在一起
  • SparkSQL提供了StructType(fields:Seq[StructField])类来表示表的模式信息
  • StructField(name, dataType, nullable) name字段名称 dataType字段类型
  • 制作表中记录的时候,每条记录都封装到Row对象中,并把所有的Row对象一起保存到一个RDD中
  • 表头和表中记录可以通过spark.createDataFrame()语句进行拼接并得到一个DataFrame
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
//生成字段
scala> val fields = Array(StructField("name",StringType,true), StructField("age",IntegerType,true))
fields: Array[org.apache.spark.sql.types.StructField] = Array(StructField(name,StringType,true), StructField(age,IntegerType,true))
scala> val schema = StructType(fields)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age, IntegerType,true))
//从上面信息可以看出,schema描述了模式信息,模式中包含name和age两个字段
//shcema就是“表头”
//下面加载文件生成RDD
scala> val peopleRDD = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/examples/src/main/resources/people.txt MapPartitionsRDD[1] at textFile at <console>:26 
//对peopleRDD 这个RDD中的每一行元素都进行解析
scala> val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim.toInt))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at map at <console>:29
//上面得到的rowRDD就是“表中的记录”
//下面把“表头”和“表中的记录”拼装起来
 scala> val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: int]
 //必须注册为临时表才能供下面查询使用
scala> peopleDF.createOrReplaceTempView("people")
 scala> val results = spark.sql("SELECT name,age FROM people")

6、读写数据库

6.1 JDBC

scala> val jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/spark").option("driver","com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "hadoop").load()
scala> jdbcDF.show()
studentDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)

6.2 Hive

scala> import org.apache.spark.sql.Row
scala> import org.apache.spark.sql.SparkSession
scala> case class Record(key: Int, value: String)
scala> val warehouseLocation = "spark-warehouse" // 这个是配置的
scala> val spark = SparkSession.builder().appName("Spark Hive Example").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate()
scala> import spark.implicits._
scala> import spark.sql
//下面是运行结果
scala> sql("SELECT * FROM sparktest.student").show()
    scala> import java.util.Properties
    scala> import org.apache.spark.sql.types._
    scala> import org.apache.spark.sql.Row 
    //下面设置两条数据表示两个学生信息
    scala> val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26","4 Guanhua M 27")).map(_.split(" "))
    //下面设置模式信息
    scala> val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true)))
     //下面创建Row对象,每个Row对象都是rowRDD中的一行
    scala> val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))
    //建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
    scala> val studentDF = spark.createDataFrame(rowRDD, schema)
    //查看studentDF
    scala> studentDF.show()
    +---+---------+------+---+
    | id| name|gender|age|
    +---+---------+------+---+
    | 3|Rongcheng| M| 26|
    | 4| Guanhua| M| 27|
    +---+---------+------+---+
    //下面注册临时表
    scala> studentDF.registerTempTable("tempTable") 
    //下面执行向Hive中插入记录的操作
    scala> sql("insert into sparktest.student select * from tempTable")

相关文章

网友评论

      本文标题:Spark编程基础(Scala版)——Spark SQL

      本文链接:https://www.haomeiwen.com/subject/ewkovltx.html