1 pyspark中的functions
这些内置函数可以极大的简化spark数据分析,到Spark2.2已经拥有307个函数,只有通过大量实践才能熟练掌握
其中的udf函数可以通过自己实现更加复杂的算法
1.1.udf函数(自定义函数)
from pyspark.sql import SparkSession #sparkSession为同统一入口
from pyspark.sql.types import *
from pyspark.sql import functions as F
#创建spakr对象
spark = SparkSession\
.builder\
.appName('byRdd')\
.getOrCreate()
employees = [(1, "John", 25), (2, "Ray", 35), (3,"Mike", 24), (4, "Jane", 28), (5, "Kevin", 26),
(6, "Vincent", 35), (7,"James", 38), (8, "Shane", 32), (9, "Larry", 29), (10, "Kimberly", 29),
(11, "Alex", 28), (12, "Garry", 25), (13, "Max",31)]
schema = StructType([StructField('emp_id',IntegerType(),True),
StructField('name',StringType(),True),
StructField('age',IntegerType(),True)])
df = spark.createDataFrame(employees,schema=schema)
#假如现在我要生成一个新列,新列中的数据是每一行name和age的字典数据
#定义一个普通函数
def get_dict(name_value,age_value):
return {name_value:age_value}
#通过F.udf将函数转换为udf函数
# 第一个参数为自定义函数名
# 第二个参数为函数返回类型(如果是list或者dict,内部也需要定义具体的数据类型)
udf_get_dict = F.udf(get_dict,MapType(StringType(),IntegerType()))
#通过生成一个新列用转换后的udf函数来处理每一行的数据
df17 = df.withColumn('dict_column',udf_get_dict(df.name,df.age))
df17.show()

udf函数可以实现很多自定义的数据处理算法来处理每一行的数据,填补数据处理中的空白
1.2 常用函数
1.array(返回一个新的数组)
# 将name和age组成一个数组列
df1 = df.withColumn('arrayColumn',F.array(df.name,df.age))
# 单出输出一个dataframe
df2 = df.select(F.array(df.name,df.age).alias('arrayColumn'))
df1.show()
df2.show()

接下来都用单独输出dataframe来展示效果
- array_contains(判断数组列中是否存在指定参数)
# 创建一个数组dataframe
df1 = spark.createDataFrame([(['a', 'b', 'c'],), (['e'],)], ['data'])
df1.show()
# 判断是否在每行的数组列中存在参数 'a',存在为true,不存在为false
df1.select(F.array_contains(df1.data, 'a')).show()

3.collect_list(将指定列转化为列表)
df3 = df.select(F.collect_list(df.age))
df3.show()

一般与groupby连用,如下
employees1 = [(1, "John", 25), (2, "Ray", 35), (3,"Ray", 24), (4, "Jane", 28), (5, "Kevin", 26),
(6, "Vincent", 35), (7,"James", 38), (8, "Shane", 32), (9, "Larry", 29), (10, "Larry", 29),
(11, "Alex", 28), (12, "Garry", 25), (13, "Max",31)]
schema = StructType([StructField('emp_id',IntegerType(),True),
StructField('name',StringType(),True),
StructField('age',IntegerType(),True)])
df = spark.createDataFrame(employees1,schema=schema)
df.show()
# 根据name分组后,将相同name的age组成一个列表
df4 = df.groupBy(df.name).agg(F.collect_list(df.age))
df4.show()

4.collect_set(聚合指定列为列表,并剔除重复元素)
employees1 = [(1, "John", 25), (2, "Ray", 35), (3,"Ray", 35), (4, "Jane", 28), (5, "Kevin", 26),
(6, "Vincent", 35), (7,"James", 38), (8, "Larry", 32), (9, "Larry", 29), (10, "Larry", 29),
(11, "Alex", 28), (12, "Garry", 25), (13, "Max",31)]
schema = StructType([StructField('emp_id',IntegerType(),True),
StructField('name',StringType(),True),
StructField('age',IntegerType(),True)])
df = spark.createDataFrame(employees1,schema=schema)
df.show()
# 根据name分组,聚合age为列表,并剔除重复元素
df1 = df.groupBy(df.name).agg(F.collect_set(df.age))
df1.show()

5.concat(将多列拼成一个列,返回字符串形式)
employees1 = [(1, "John", 25), (2, "Ray", 35), (3,"Ray", 35), (4, "Jane", 28), (5, "Kevin", 26),
(6, "Vincent", 35), (7,"James", 38), (8, "Larry", 32), (9, "Larry", 29), (10, "Larry", 29),
(11, "Alex", 28), (12, "Garry", 25), (13, "Max",31)]
schema = StructType([StructField('emp_id',IntegerType(),True),
StructField('name',StringType(),True),
StructField('age',IntegerType(),True)])
df = spark.createDataFrame(employees1,schema=schema)
df.show()
#将name和age拼成一个列命名为concat
df1 = df.withColumn('concat',F.concat(df.name,df.age))
df1.show()

6.concat_ws(使用给定的分隔符将多列拼成一个列)
employees1 = [(1, "John", 25), (2, "Ray", 35), (3,"Ray", 35), (4, "Jane", 28), (5, "Kevin", 26),
(6, "Vincent", 35), (7,"James", 38), (8, "Larry", 32), (9, "Larry", 29), (10, "Larry", 29),
(11, "Alex", 28), (12, "Garry", 25), (13, "Max",31)]
schema = StructType([StructField('emp_id',IntegerType(),True),
StructField('name',StringType(),True),
StructField('age',IntegerType(),True)])
df = spark.createDataFrame(employees1,schema=schema)
df.show()
#使用分隔符'-'将name和age拼成一个列命名为concat_ws
df1 = df.withColumn('concat_ws',F.concat_ws('-',df.name,df.age))
df1.show()

7.countDistinct(统计列去重数据个数)
employees1 = [(1, "John", 25), (2, "Ray", 35), (3,"Ray", 35), (4, "Jane", 28), (5, "Kevin", 26),
(6, "Vincent", 35), (7,"James", 38), (8, "Larry", 32), (9, "Larry", 29), (10, "Larry", 29),
(11, "Alex", 28), (12, "Garry", 25), (13, "Max",31)]
schema = StructType([StructField('emp_id',IntegerType(),True),
StructField('name',StringType(),True),
StructField('age',IntegerType(),True)])
df = spark.createDataFrame(employees1,schema=schema)
df.show()
#统计name不同的个数并返回一个dataframe
df1 = df.agg(F.countDistinct(df.name).alias('countDistinct'))
df1.show()

8.explode(为数组或指定元素映射一出一个新行,就是遍历数组到每一行)
第一种用法直接分解数组
employees1 = [(1, "John", 25), (2, "Ray", 35), (3,"Ray", 35), (4, "Jane", 28), (5, "Kevin", 26),
(6, "Vincent", 35), (7,"James", 38), (8, "Larry", 32), (9, "Larry", 29), (10, "Larry", 29),
(11, "Alex", 28), (12, "Garry", 25), (13, "Max",31)]
schema = StructType([StructField('emp_id',IntegerType(),True),
StructField('name',StringType(),True),
StructField('age',IntegerType(),True)])
df = spark.createDataFrame(employees1,schema=schema)
df.show()
#先通过上面介绍的聚合方法,将name相同的age聚合成一个列表
df1 = df.groupby(df.name).agg(F.collect_list(df.age).alias('list_column'))
df1.show()
#通过explode方法将列表分解到每一行起名explode列
df2 = df1.withColumn('explode',F.explode(df1.list_column))
df2.show()

第二种用法根据指定分隔符分解字符串
df = spark.createDataFrame([('a,b',),('c,d',),('e,f',)],['str',])
df.show()
# 先将字符分隔split为数组,然后explode转换,这两个配合使用达到分隔转换字符串的目的
df1 = df.withColumn('explode',F.explode(F.split(df.str,',')))
df1.show()

第三种用法分解map格式
df = spark.createDataFrame([({'a':'b'},)],['map',])
df.show()
# 将map转化为两列key和value
df1 = df.select(F.explode(df.map).alias('key','value'))
df1.show()

网友评论