dataframe

作者: hehehehe | 来源:发表于2021-09-26 15:16 被阅读0次

https://github.com/spark-examples/pyspark-examples
pivot

df = spark.createDataFrame([('Joe', '70000'), ('Joe', '70000'),( 'Henry', '80000')],
                           ['Name', 'Sallary'])
+-----+-------+
| Name|Sallary|
+-----+-------+
|  Joe|  70000|
|  Joe|  70000|
|Henry|  80000|
+-----+-------+
df.selectExpr("row_number() over (order by Name) as a","Name","Sallary").show()
+---+-----+-------+
|  a| Name|Sallary|
+---+-----+-------+
|  1|Henry|  80000|
|  2|  Joe|  70000|
|  3|  Joe|  70000|
+---+-----+-------+
df.selectExpr("concat(cast(a as string),'aaa')").show()

from pyspark.sql.functions import first
df.groupBy().pivot("Name").agg(first("Sallary")).show()
    joined_group_df = spark.sql("""
        select id,collect_set(address)[0] as address,collect_set(address_normal)[0] as address_normal,
        concat_ws(",",collect_set(id2)) as id2,
        collect_set(address2)[0] as address2,
        collect_set(address_normal2)[0] as address_normal2,
        concat_ws(",",collect_set(longitude2)) as longitude2,
        concat_ws(",",collect_set(latitude2)) as latitude2
        from joined_df group by id
    """).cache()
dataframe udf
from pyspark.sql.types import StructType, StringType, StructField, IntegerType,ArrayType

@udf(returnType=ArrayType(StringType()))
def lonlat_split(address: str):
    return address[:-1].split(",")

@udf(returnType=StringType())
def get_centroid(lons: list, lats: list):
    points = []
    for lon, lat in zip(lons, lats):
        points.append(Point(float(lon), float(lat)))
    return MultiPoint(points).centroid.wkt

df2 = df.withColumn("lon",lonlat_split(df.district_pos)[0])
df2 = df2.withColumn("lat",lonlat_split(df.district_pos)[1])
df2.select(['lon','lat']).show(2)
+----------+---------+
|       lon|      lat|
+----------+---------+
|109.268983|22.687409|
|108.810373|23.217889|
+----------+---------+
only showing top 2 rows
from pyspark.sql import functions as fn
df.groupBy(['_1']).agg(fn.collect_list('_2').alias('list')).show(5)
+---+------+
| _1|  list|
+---+------+
|  1|[2, 3]|
|  2|   [3]|
+---+------+
zipWithIndex
df0 = sc.parallelize(range(2), 2).mapPartitions(lambda x: [(1,), (2,), (3,)]).toDF(['col1'])

In [50]: df1 = df0.rdd.zipWithIndex()
In [51]: df1.take(3)
Out[51]: [(Row(col1=1), 0), (Row(col1=2), 1), (Row(col1=3), 2)]

In [56]: df1.map(lambda x:Row(a=x[0]['col1'],id=x[1])).toDF().show()
+---+---+
|  a| id|
+---+---+
|  1|  0|
|  2|  1|
|  3|  2|
|  1|  3|
|  2|  4|
|  3|  5|
+---+---+
rdd_zip = rdd.zipWithIndex().map(lambda x: (x[1], *x[0]))
rdd_zip = rdd.zipWithIndex().map(lambda x: {"id": x[1], **x[0].asDict()})

join
In [16]: e1.alias('ee1').join(e2.alias('ee2'),[col('ee1.emp_id') == col('ee2.emp
    ...: _id')],'left').select(col('ee1.emp_id'),col('ee2.emp_id').alias('id2'))
    ...: .show()
+------+----+
|emp_id| id2|
+------+----+
|     6|null|
|     5|null|
|     1|   1|
|     3|null|
|     2|   2|
|     4|null|
+------+----+

r1 = sc.parallelize([(1,100),(2,200),(3,300),(4,400)])
r2 = sc.parallelize([(2,(22,0)),(4,(44,1)),(5,(55,0)),(6,(66,1))])
r3 = sc.parallelize([(2,22,0),(4,44,1),(5,55,0),(6,66,1)])

>>> r1.leftOuterJoin(r2).collect()
[(1, (100, None)), (2, (200, (22, 0))), (3, (300, None)), (4, (400, (44, 1)))]

>>> df1 = spark.createDataFrame(r1,['id','amount'])
>>> df1.show()
+---+------+
| id|amount|
+---+------+
|  1|   100|
|  2|   200|
|  3|   300|
|  4|   400|
+---+------+

>>> df3 = spark.createDataFrame(r3,['id','weight','exist'])
>>> df3.show()
+---+------+-----+
| id|weight|exist|
+---+------+-----+
|  2|    22|    0|
|  4|    44|    1|
|  5|    55|    0|
|  6|    66|    1|
+---+------+-----+

SELECT a.* FROM product a LEFT JOIN product_details b
ON a.id=b.id AND b.weight!=44 AND b.exist=0
WHERE b.id IS NULL;

>>> df4 = df1.join(df3,[df1.id == df3.id , df3.weight!=44 , df3.exist==0],'left')
>>> df4.filter('weight is null' ).show()
+---+------+----+------+-----+
| id|amount|  id|weight|exist|
+---+------+----+------+-----+
|  1|   100|null|  null| null|
|  3|   300|null|  null| null|
|  4|   400|null|  null| null|
+---+------+----+------+-----+

agg
https://www.cnblogs.com/seekerjunyu/p/14016240.html

1、groupby
from pyspark.sql.functions import first, collect_list, mean
In:
df.groupBy("ID").agg(mean("P"), first("index"),
                     first("xinf"), first("xup"), 
                     first("yinf"), first("ysup"), 
                     collect_list("M"))
2、dataframe
df.agg(mean('label')).show()
+------------------+
|        avg(label)|
+------------------+
|0.7411402157164869|
+------------------+
agglist = [mean(x) for x in tips_.columns]
agglist
Out[109]: [Column<b'avg(total_bill)'>, Column<b'avg(tip)'>, Column<b'avg(size)'>]
tips_.agg(*agglist).show()
+------------------+----------------+-----------------+
|   avg(total_bill)|        avg(tip)|        avg(size)|
+------------------+----------------+-----------------+
|19.785942643392282|2.99827868821191|2.569672131147541|
+------------------+----------------+-----------------+

df_group = df.groupBy('building') \
    .agg(fn.collect_list('hn_id').alias('hn_id'), fn.collect_list('building_num').alias('building_nums'))
df_group.foreach(gen_cp)
column

from pyspark.sql.functions import concat_ws,current_date,when,col

@udf(returnType=StringType())
def strQ2B(ustring):
    if ustring:
        ss = []
        for s in ustring:
            rstring = ""
            for uchar in s:
                inside_code = ord(uchar)
                if inside_code == 12288:  # 全角空格直接转换
                    inside_code = 32
                elif (inside_code >= 65281 and inside_code <= 65374):  # 全角字符(除空格)根据关系转化
                    inside_code -= 65248
                rstring += chr(inside_code)
            ss.append(rstring)
        return "".join(ss)
    return ustring
df2 = df2.withColumn("address", strQ2B(col("address")))

df2 = df.withColumn("salary",col("salary").cast("Integer"))
df3 = df.withColumn("salary",col("salary")*100)
df4 = df.withColumn("CopiedColumn",col("salary")* -1)
df5 = df.withColumn("Country", lit("USA"))
df.withColumnRenamed("gender","sex") 
df.sort(df.department.asc(),df.state.desc()).show(truncate=False)
df_r=df.withColumn('row_number',sf.row_number().over(Window.partitionBy(df.level).orderBy(df.age)).alias("rowNum"))
df.withColumn("name", concat_ws(",","firstname",'lastname')) .show()
df.withColumn("current_date", current_date()) .show()
df.withColumn("grade", \
   when((df.salary < 4000), lit("A")) \
     .when((df.salary >= 4000) & (df.salary <= 5000), lit("B")) \
     .otherwise(lit("C")) \
  ).show()

df4 = spark.sql("SELECT STRING(age),BOOLEAN(isGraduated),DATE(jobStartDate) from CastExample")

df.withColumn("salary",df.salary.cast('double')).printSchema()    
df.withColumn("salary",df.salary.cast(DoublerType())).printSchema()    
df.withColumn("salary",col("salary").cast('double')).printSchema()
groupby
df_group = df.groupBy('building') \
    .agg(fn.collect_list('hn_id').alias('hn_id'), fn.collect_list('building_num').alias('building_nums'))
df_group.foreach(gen_cp)


def generate_parent_children(row):
    result = []
    building, hnid_nums = row[0], row[1]
    parent = []
    children = []
    for hnid_num in hnid_nums:
        hnid, num = hnid_num.split('|')
        if num == '-1':
            parent.append(hnid)
        else:
            children.append(hnid)
    if parent and children:
        if len(parent) > 1:
            print("+" * 10)
        child_str = ",".join(children)
        parent_str = ",".join(parent)
        for p in parent:
            result.append(p, None, child_str)
        for c in children:
            result.append(c, parent_str, None)
    return result

rdd = df.rdd.map(lambda x: (x['building'], x['hn_id'] + '|' + x['building_num'] if x['building_num'] else "-1"))
parent_children_rdd = rdd.groupByKey().flatMap(generate_parent_children)

select

from pyspark.sql.functions import approx_count_distinct,collect_list
from pyspark.sql.functions import collect_set,sum,avg,max,countDistinct,count
from pyspark.sql.functions import first, last, kurtosis, min, mean, skewness
from pyspark.sql.functions import stddev, stddev_samp, stddev_pop, sumDistinct
from pyspark.sql.functions import variance,var_samp, var_pop
from pyspark.sql.functions import col,expr

df.select(df.colRegex("`^.*name*`")).show()
# Using split() function of Column class
split_col = pyspark.sql.functions.split(df['dob'], '-')
df3 = df.select("firstname", "middlename", "lastname", "dob", split_col.getItem(0).alias('year'),
                split_col.getItem(1).alias('month'), split_col.getItem(2).alias('day'))
df3.show(truncate=False)

from pyspark.sql.functions import col,expr
data=[("2019-01-23",1),("2019-06-24",2),("2019-09-20",3)]
spark.createDataFrame(data).toDF("date","increment") \
    .select(col("date"),col("increment"), \
      expr("add_months(to_date(date,'yyyy-MM-dd'),cast(increment as int))").alias("inc_date")) \
    .show()

print("approx_count_distinct: " + \
      str(df.select(approx_count_distinct("salary")).collect()[0][0]))

print("avg: " + str(df.select(avg("salary")).collect()[0][0]))

df.select(collect_list("salary")).show(truncate=False)

df.select(collect_set("salary")).show(truncate=False)

df2 = df.select(countDistinct("department", "salary"))
df2.show(truncate=False)
print("Distinct Count of Department &amp; Salary: "+str(df2.collect()[0][0]))

print("count: "+str(df.select(count("salary")).collect()[0]))
df.select(first("salary")).show(truncate=False)
df.select(last("salary")).show(truncate=False)
df.select(kurtosis("salary")).show(truncate=False)
df.select(max("salary")).show(truncate=False)
df.select(min("salary")).show(truncate=False)
df.select(mean("salary")).show(truncate=False)
df.select(skewness("salary")).show(truncate=False)
df.select(stddev("salary"), stddev_samp("salary"), \
    stddev_pop("salary")).show(truncate=False)
df.select(sum("salary")).show(truncate=False)
df.select(sumDistinct("salary")).show(truncate=False)
df.select(variance("salary"),var_samp("salary"),var_pop("salary")) \
  .show(truncate=False)

array_string

from pyspark.sql.functions import col, concat_ws

+----------------+------------------+------------+
|name            |languagesAtSchool |currentState|
+----------------+------------------+------------+
|James,,Smith    |[Java, Scala, C++]|CA          |
|Michael,Rose,   |[Spark, Java, C++]|NJ          |
|Robert,,Williams|[CSharp, VB]      |NV          |
+----------------+------------------+------------+

df2 = df.withColumn("languagesAtSchool",
   concat_ws(",",col("languagesAtSchool")))
df2.printSchema()
df2.show(truncate=False)


df.createOrReplaceTempView("ARRAY_STRING")
spark.sql("select name, concat_ws(',',languagesAtSchool) as languagesAtSchool," + \
    " currentState from ARRAY_STRING") \
    .show(truncate=False)
ArrayType
from pyspark.sql.types import StringType, ArrayType,StructType,StructField
arrayCol = ArrayType(StringType(),False)

data = [
 ("James,,Smith",["Java","Scala","C++"],["Spark","Java"],"OH","CA"),
 ("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"],"NY","NJ"),
 ("Robert,,Williams",["CSharp","VB"],["Spark","Python"],"UT","NV")
]

schema = StructType([ 
    StructField("name",StringType(),True), 
    StructField("languagesAtSchool",ArrayType(StringType()),True), 
    StructField("languagesAtWork",ArrayType(StringType()),True), 
    StructField("currentState", StringType(), True), 
    StructField("previousState", StringType(), True) 
  ])

df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show()

from pyspark.sql.functions import explode
df.select(df.name,explode(df.languagesAtSchool)).show()
+----------------+------+
|            name|   col|
+----------------+------+
|    James,,Smith|  Java|
|    James,,Smith| Scala|
|    James,,Smith|   C++|
|   Michael,Rose,| Spark|
|   Michael,Rose,|  Java|
|   Michael,Rose,|   C++|
|Robert,,Williams|CSharp|
|Robert,,Williams|    VB|
+----------------+------+

from pyspark.sql.functions import split
df.select(split(df.name,",").alias("nameAsArray")).show()

from pyspark.sql.functions import array
df.select(df.name,array(df.currentState,df.previousState).alias("States")).show()

from pyspark.sql.functions import array_contains
df.select(df.name,array_contains(df.languagesAtSchool,"Java")
    .alias("array_contains")).show()
broadcast
states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)

data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]

columns = ["firstname","lastname","country","state"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)
+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|James    |Smith   |USA    |CA   |
|Michael  |Rose    |USA    |NY   |
|Robert   |Williams|USA    |CA   |
|Maria    |Jones   |USA    |FL   |
+---------+--------+-------+-----+

def state_convert(code):
    return broadcastStates.value[code]

result = df.rdd.map(lambda x: (x[0],x[1],x[2],state_convert(x[3]))).toDF(columns)
result.show(truncate=False)
+---------+--------+-------+----------+
|firstname|lastname|country|state     |
+---------+--------+-------+----------+
|James    |Smith   |USA    |California|
|Michael  |Rose    |USA    |New York  |
|Robert   |Williams|USA    |California|
|Maria    |Jones   |USA    |Florida   |
+---------+--------+-------+----------+
# Broadcast variable on filter

filteDf= df.where((df['state'].isin(broadcastStates.value)))
column
df.sort(df.fname.asc()).show()
#between
df.filter(df.id.between(100,300)).show()

#contains
df.filter(df.fname.contains("Cruise")).show()

#startswith, endswith()
df.filter(df.fname.startswith("T")).show()
df.filter(df.fname.endswith("Cruise")).show()
#isNull & isNotNull
df.filter(df.lname.isNull()).show()
df.filter(df.lname.isNotNull()).show()

#like , rlike
df.select(df.fname,df.lname,df.id) \
  .filter(df.fname.like("%om")) 

#substr
df.select(df.fname.substr(1,2).alias("substr")).show()

#isin
li=["100","200"]
df.select(df.fname,df.lname,df.id) \
  .filter(df.id.isin(li)) \
  .show()

schema = StructType([
        StructField('name', StructType([
            StructField('fname', StringType(), True),
            StructField('lname', StringType(), True)])
       ),
        StructField('languages', ArrayType(StringType()),True),
        StructField('properties', MapType(StringType(),StringType()),True)
     ])
+--------------+---------------+-----------------------------+
|          name|      languages|                   properties|
+--------------+---------------+-----------------------------+
| [James, Bond]|     [Java, C#]|[eye -> brown, hair -> black]|
|  [Ann, Varsa]| [.NET, Python]|[eye -> black, hair -> brown]|
|[Tom Cruise, ]|[Python, Scala]|   [eye -> grey, hair -> red]|
|  [Tom Brand,]|   [Perl, Ruby]| [eye -> blue, hair -> black]|
+--------------+---------------+-----------------------------+
#getItem()
df.select(df.languages.getItem(1)).show()

df.select(df.properties.getItem("hair")).show()

#getField from Struct or Map
df.select(df.properties.getField("hair")).show()

df.select(df.name.getField("fname")).show()
#dropFields
#from pyspark.sql.functions import col
#df.withColumn("name1",col("name").dropFields(["fname"])).show()

#withField
#from pyspark.sql.functions import lit
#df.withColumn("name",df.name.withField("fname",lit("AA"))).show()

#from pyspark.sql import Row
#from pyspark.sql.functions import lit
#df = spark.createDataFrame([Row(a=Row(b=1, c=2))])
#df.withColumn('a', df['a'].withField('b', lit(3))).select('a.b').show()
        
#from pyspark.sql import Row
#from pyspark.sql.functions import col, lit
#df = spark.createDataFrame([
#Row(a=Row(b=1, c=2, d=3, e=Row(f=4, g=5, h=6)))])
#df.withColumn('a', df['a'].dropFields('b')).show()

from pyspark.sql.functions import col,lit,create_map
df = df.withColumn("propertiesMap",create_map(
        lit("salary"),col("salary"),
        lit("location"),col("location")
        )).drop("salary","location")
create dataframe
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import *

columns = ["language","users_count"]
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
rdd = spark.sparkContext.parallelize(data)

dfFromRDD1 = rdd.toDF()
dfFromRDD1.printSchema()

dfFromRDD1 = rdd.toDF(columns)
dfFromRDD1.printSchema()

dfFromRDD2 = spark.createDataFrame(rdd).toDF(*columns)
dfFromRDD2.printSchema()

dfFromData2 = spark.createDataFrame(data).toDF(*columns)
dfFromData2.printSchema()     

rowData = map(lambda x: Row(*x), data) 
dfFromData3 = spark.createDataFrame(rowData,columns)
dfFromData3.printSchema()

date current_timestamp

               .getOrCreate()
data=[["1"]]
df=spark.createDataFrame(data,["id"])

from pyspark.sql.functions import *

#current_date() & current_timestamp()
df.withColumn("current_date",current_date()) \
  .withColumn("current_timestamp",current_timestamp()) \
  .show(truncate=False)

#SQL
spark.sql("select current_date(), current_timestamp()") \
     .show(truncate=False)

# Date & Timestamp into custom format
df.withColumn("date_format",date_format(current_date(),"MM-dd-yyyy")) \
  .withColumn("to_timestamp",to_timestamp(current_timestamp(),"MM-dd-yyyy HH mm ss SSS")) \
  .show(truncate=False)

#SQL
spark.sql("select date_format(current_date(),'MM-dd-yyyy') as date_format ," + \
          "to_timestamp(current_timestamp(),'MM-dd-yyyy HH mm ss SSS') as to_timestamp") \
     .show(truncate=False)


from pyspark.sql.functions import *

df=spark.createDataFrame([["1"]],["id"])
df.select(current_date().alias("current_date"), \
      date_format(current_date(),"yyyy MM dd").alias("yyyy MM dd"), \
      date_format(current_timestamp(),"MM/dd/yyyy hh:mm").alias("MM/dd/yyyy"), \
      date_format(current_timestamp(),"yyyy MMM dd").alias("yyyy MMMM dd"), \
      date_format(current_timestamp(),"yyyy MMMM dd E").alias("yyyy MMMM dd E") \
   ).show()

#SQL

spark.sql("select current_date() as current_date, "+
      "date_format(current_timestamp(),'yyyy MM dd') as yyyy_MM_dd, "+
      "date_format(current_timestamp(),'MM/dd/yyyy hh:mm') as MM_dd_yyyy, "+
      "date_format(current_timestamp(),'yyyy MMM dd') as yyyy_MMMM_dd, "+
      "date_format(current_timestamp(),'yyyy MMMM dd E') as yyyy_MMMM_dd_E").show()
date functions
data=[["1","2020-02-01"],["2","2019-03-01"],["3","2021-03-01"]]
df=spark.createDataFrame(data,["id","input"])
df.show()

from pyspark.sql.functions import *

#current_date()
df.select(current_date().alias("current_date")
  ).show(1)

#date_format()
df.select(col("input"), 
    date_format(col("input"), "MM-dd-yyyy").alias("date_format") 
  ).show()

#to_date()
df.select(col("input"), 
    to_date(col("input"), "yyy-MM-dd").alias("to_date") 
  ).show()

#datediff()
df.select(col("input"), 
    datediff(current_date(),col("input")).alias("datediff")  
  ).show()

#months_between()
df.select(col("input"), 
    months_between(current_date(),col("input")).alias("months_between")  
  ).show()
#trunc()
df.select(col("input"), 
    trunc(col("input"),"Month").alias("Month_Trunc"), 
    trunc(col("input"),"Year").alias("Month_Year"), 
    trunc(col("input"),"Month").alias("Month_Trunc")
   ).show()

#add_months() , date_add(), date_sub()

df.select(col("input"), 
    add_months(col("input"),3).alias("add_months"), 
    add_months(col("input"),-3).alias("sub_months"), 
    date_add(col("input"),4).alias("date_add"), 
    date_sub(col("input"),4).alias("date_sub") 
  ).show()

#

df.select(col("input"), 
     year(col("input")).alias("year"), 
     month(col("input")).alias("month"), 
     next_day(col("input"),"Sunday").alias("next_day"), 
     weekofyear(col("input")).alias("weekofyear") 
  ).show()

df.select(col("input"),  
     dayofweek(col("input")).alias("dayofweek"), 
     dayofmonth(col("input")).alias("dayofmonth"), 
     dayofyear(col("input")).alias("dayofyear"), 
  ).show()
data=[["1","02-01-2020 11 01 19 06"],["2","03-01-2019 12 01 19 406"],["3","03-01-2021 12 01 19 406"]]
df2=spark.createDataFrame(data,["id","input"])
df2.show(truncate=False)

#current_timestamp()
df2.select(current_timestamp().alias("current_timestamp")
  ).show(1,truncate=False)

#to_timestamp()
df2.select(col("input"), 
    to_timestamp(col("input"), "MM-dd-yyyy HH mm ss SSS").alias("to_timestamp") 
  ).show(truncate=False)


#hour, minute,second
data=[["1","2020-02-01 11:01:19.06"],["2","2019-03-01 12:01:19.406"],["3","2021-03-01 12:01:19.406"]]
df3=spark.createDataFrame(data,["id","input"])

df3.select(col("input"), 
    hour(col("input")).alias("hour"), 
    minute(col("input")).alias("minute"),
    second(col("input")).alias("second") 
  ).show(truncate=False)

window func

simpleData = (("James", "Sales", 3000), \
              ("Michael", "Sales", 4600), \
              ("Robert", "Sales", 4100), \
              ("Maria", "Finance", 3000), \
              ("James", "Sales", 3000), \
              ("Scott", "Finance", 3300), \
              ("Jen", "Finance", 3900), \
              ("Jeff", "Marketing", 3000), \
              ("Kumar", "Marketing", 2000), \
              ("Saif", "Sales", 4100) \
              )

columns = ["employee_name", "department", "salary"]

df = spark.createDataFrame(data=simpleData, schema=columns)

df.printSchema()
df.show(truncate=False)

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

windowSpec = Window.partitionBy("department").orderBy("salary")

df.withColumn("row_number", row_number().over(windowSpec)) \
    .show(truncate=False)

from pyspark.sql.functions import rank

df.withColumn("rank", rank().over(windowSpec)) \
    .show()

from pyspark.sql.functions import dense_rank

df.withColumn("dense_rank", dense_rank().over(windowSpec)) \
    .show()

from pyspark.sql.functions import percent_rank

df.withColumn("percent_rank", percent_rank().over(windowSpec)) \
    .show()

from pyspark.sql.functions import ntile

df.withColumn("ntile", ntile(2).over(windowSpec)) \
    .show()

from pyspark.sql.functions import cume_dist

df.withColumn("cume_dist", cume_dist().over(windowSpec)) \
    .show()

from pyspark.sql.functions import lag

df.withColumn("lag", lag("salary", 2).over(windowSpec)) \
    .show()

from pyspark.sql.functions import lead

df.withColumn("lead", lead("salary", 2).over(windowSpec)) \
    .show()

windowSpecAgg = Window.partitionBy("department")
from pyspark.sql.functions import col, avg, sum, min, max, row_number

df.withColumn("row", row_number().over(windowSpec)) \
    .withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
    .withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
    .withColumn("min", min(col("salary")).over(windowSpecAgg)) \
    .withColumn("max", max(col("salary")).over(windowSpecAgg)) \
    .where(col("row") == 1).select("department", "avg", "sum", "min", "max") \
    .show()


相关文章

网友评论

      本文标题:dataframe

      本文链接:https://www.haomeiwen.com/subject/yfbqnltx.html