美文网首页
Spark使用总结-Scala

Spark使用总结-Scala

作者: slowrabbit | 来源:发表于2019-09-29 19:31 被阅读0次

Scala Spark使用

特殊引用

使用比如hive sql或者rdd转换toDF是通过隐式转换,需要增加相关的包引用
1 hive sql: import spark.sql
2 隐式函数: import spark.implicits._

SparkSession

初始化

val sparkSession = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .enableHiveSupport() //支持hive
  .getOrCreate()

json

1 读取json

sparkSession
  .read
  .schema(...)
  .json(path)

2 读取有效hdfs路径文件:有的时候按照时间范围获取文件,但有可能其中有的小时没有生成文件,则可:
1)hdfs:

import org.apache.hadoop.fs.{FileSystem, Path}
val fs: FileSystem = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration) 
val validPaths = pathList.flatMap( p => fs.globStatus(new Path(p)).map(_.getPath.toString))
  1. OSS:可以用阿里云的OSSClient,如果没有,则循环try-cache来获取:
def readMultiOssPaths(spark: SparkSession, schema: StructType, paths: List[String], colNameList: Array[String]) = {
    var Df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)
    for (path <- paths) {
      try {
        val singleDF = spark
          .read
          .schema(schema)
          .json(path)
          .select(colNameList.map(col): _*)
        Df = Df.union(singleDF)
      } catch {
        case e: Exception => println(s"${path} exception: ${e.getMessage}")
      }
    }
    Df
  }
}

3 写入hdfs或者oss文件系统

sparkSession
  .write
  .mode("override")//支持覆盖写入
  .format("json")
  .option("compression", "gzip")
  .save(path)

mysql

在spark-submit时,需要--jars yourpath/json-serde-xxx-jar-with-dependencies.jar
1 读取mysql表
基本模式:

sparkSession
  .read
  .format("jdbc")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("url", jdbcUrl)
  .option("dbtable",table)
  .load()

分区模式:

sparkSession
  ...
  .option("partitionColumn", partitionCol) //分区依赖的列,选择均匀避免倾斜
  .option("lowerBound", 1) //partitionColumn最小值
  .option("upperBound", Long.MaxValue)//partitionColumn最大值
  .option("numPartitions", numPartition)//分区数
  ...

注意:join hive是无法支持的,需要将join的结果作为dbtable传入

val table = s"""(
       |select
       |    a.id,
       |    b.id,
       |    a.name,
       |    unix_timestamp(a.create_time) * 1000 as create_timestamp,
       |from
       |    a
       |    inner join
       |        b
       |    on
       |        (
       |         a.id = b.a_id and
       |         a.create_time >= '2019-09-18 00:00:00' and
       |         a.create_time < '2019-09-19 00:00:00' and
       |         a.name is not null
       |        )
       |) as oc
       |""".stripMargin
val aJoinBDF = sparkSession
  .read
  .format("jdbc")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("url", jdbcUrl)
  .option("dbtable",table)
  .load()    

Spark Dataframe

转rdd

df.rdd

rdd 转DF

如果是定义好的POJO,当rdd map或者其他操作转换出来的是POJO的对象,则可以直接toDF为以对象属性为列

column操作

1 全部修改

val columnMap = Map(旧列名->新列名)
val newDF = oldDF.select(oldDF.columns.map(c => col(c).as(columnMap(c))): _*)

2 修改一个列名

DF.withColumnRenamed("old_col_name", "new_col_name")

3 合并一些列名为一个新的列名

val newDF = oldDF
  .withColumn("combined", struct("*"))//*可以替换为具体的一些列名,比如abc则为struct("a", "b", "c")

4 删掉列名
DF.drop("col_a", "cod_b")
5 选取列名
DF.select("a", "b", ...)
6 过滤
DF.filter(col("a") === "value" || col("b") =!= "value")

Dataframe join

join后,如果有同名的列名,join结果是同时有两个相同名称的列名。暂没有什么高级的处理方法,需要区分时,提前将不同DF中的相同列名重命名为不同的列名
1 使用col

val joined = aDF.as("aDF")
  .join(
    bDF.as("bDF"),
    col("aDF.id") === col("bDF.id"),
    "left"
  )

2 使用seq(适合不用区分两个表的相同col名称)

val joined = aDF
  .join(
    bDF,
    Seq("id"),
    "left"
  )

分组

有时需要将不同行按照相同的某列进行分组,然后进行计算,此时需要用groupBy分组,分组后是List(Record, Record)

DF
  .groupBy("col_name")
  .agg(collect_list("grouped_col_name").as("grouped_col_name"))

去重

有的时候需要取比如每个公司每个产品最新的一条结果,则需要对数据进行去重,并取时间最近的一条

val groupColNames = List("company", "product")
val tmColName = "create_timestamp"
val w =  Window.partitionBy(groupColNames.map(col): _*).orderBy(col(tmColName).desc)
var newDf = df
  .withColumn("rn", row_number.over(w))
  .where(col("rn") === 1)
  .toDF()

rdd

由于可能存在一些复杂的计算、合并,需要将DataFrame的转为rdd进行处理

解析

Dataframe转rdd后,每一行都是一个org.apache.spark.sql.Row
1 获取Column: row.getAs[...]("column_name")。这种支持基本的String,Long, 甚至Map
2 在DataFrame中,如果没有指定Map或者Array,则这些格式Spark是会指定为Struct类型,解析需:

  1. Map中值解析
val mapCol = row.getAs[Row]("map_col")
val a = mapCol.getAs[String]("a")

2)Array中值解析

val arrCols = row.getAs[Seq[Row]]("array_col")
for(col <- array_col){
    val colA = col.getAs[String]("a")
}

拍平

1)如果结果是List,使用flatMap:rdd.map(r => r)
2)如果结果是(key, List(...)), 使用flatmap:

rdd
  .flatMap{
    case (key, arr) => {
      arr.map(ele => (key, ele))
    }
  }

相关文章

  • Spark原理简述

    Spark 学习: spark 原理简述与 shuffle 过程介绍 简述总结 Spark 是使用 scala 实...

  • Java+Scala混合开发Spark应用

    Java+Scala混合开发Spark应用 我主要使用Spark GraphX 的api,但是使用Scala对项目...

  • Spark使用总结-Scala

    Scala Spark使用 特殊引用 使用比如hive sql或者rdd转换toDF是通过隐式转换,需要增加相关的...

  • Spark编程实践

    使用Scala和Java编写Spark程序。 Spark编程 Scala实现 sbt 首先安装sbt 赋予权限 添...

  • Spark环境安装与简单的WordCount

    我们使用开发语言scala编写Spark计算 scala的安装:下载 scala-2.11.8.tgz解压并配置环...

  • Spark案例

    Spark 实战,第1 部分: 使用Scala 语言开发Spark 应用程序 Spark 实战, 第 2 部分:使...

  • Spark通信原理之Python与JVM的交互

    我们知道Spark平台是用Scala进行开发的,但是使用Spark的时候最流行的语言却不是Java和Scala,而...

  • SPARK的学习

    Spark着重学习这几点: scala 语言 Spark编程RDD 的理解使用DStream 的理解与使用 sca...

  • 搭建scala开发环境

    最近总结了下scala的基础知识,分享下 scala简介 spark的原生语言是Scala,具体的大家可以自行百度...

  • 二种方法实现Spark计算WordCount

    1.spark-shell 2.Scala for idea 最后,需要使用spark submit提交到spar...

网友评论

      本文标题:Spark使用总结-Scala

      本文链接:https://www.haomeiwen.com/subject/wovructx.html