- 目的:数据行专列,列转行
- 数据准备:
df = spark.createDataFrame([(1,"age","23|45|67|32"),(3,"score","90|91|92|93")]).toDF("id","typ","ls")
列转行:
df2 = df.withColumn("xx",explode(split("ls","\\|"))).drop("ls")
df2.show()
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
+---+-----+-----------+
| id| typ| ls|
+---+-----+-----------+
| 1| age|23|45|67|32|
| 3|score|90|91|92|93|
+---+-----+-----------+
+---+-----+---+
| id| typ| xx|
+---+-----+---+
| 1| age| 23|
| 1| age| 45|
| 1| age| 67|
| 1| age| 32|
| 3|score| 90|
| 3|score| 91|
| 3|score| 92|
| 3|score| 93|
+---+-----+---+
行专列:
df2.groupBy("id","typ").agg(concat_ws("|",collect_set("xx")).alias("ls2")).show()
+---+-----+-----------+
| id| typ| ls2|
+---+-----+-----------+
| 3|score|92|93|91|90|
| 1| age|23|45|67|32|
+---+-----+-----------+
完整代码:
# coding=utf-8
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
if __name__ == "__main__":
spark = SparkSession.builder.master("local").getOrCreate()
#数据准备
df = spark.createDataFrame([(1,"age","23|45|67|32"),(3,"score","90|91|92|93")]).toDF("id","typ","ls")
df.show()
#列转行
df2 = df.withColumn("xx",explode(split("ls","\\|"))).drop("ls")
df2.show()
#行转列
df2.groupBy("id","typ").agg(concat_ws("|",collect_set("xx")).alias("ls2")).show()
网友评论