美文网首页
spark基本Transform算子

spark基本Transform算子

作者: 小月半会飞 | 来源:发表于2019-03-05 15:06 被阅读0次

一、单个RDD的操作

1、map、mapPartition、mapPartitionsWithIndex

map:以一条记录为单位进行操作,返回一条数据
map((_,1))
map(x =>{
返回值:单个值或者键值对
})
mapPartition:以分区为单位进行操作
mapPartitionsWithIndex:以分区为单位进行操作,与mapPartition不同的是带下标

2、flatMap:输入一条数据,返回多条数据

比map多了一个flatten过程

Val arr=Array(“hello hadoop”,”hello hive”,”hello spark ”)
Val map=arr.map(_.split(“ ”)// Array(Array(hello,hadoop),Array(hello,hive),Array(hello,spark))
Map.flatten     //Array(hello,hadoop,hello,hive,hello,spark)
Arr.flatMap(_.split(“ ”))    // Array(hello,hadoop,hello,hive,hello,spark)

3、filter:过滤器

rdd.filter(_.trim.length>0)

4、sample(withReplacement:boolean(是否放回抽取了的元素,true放回,false不放回),franction:Double(抽取比例),seed:Long(抽样算法的随机数种子,默认为随机数,可以省略)):随机抽样

5、union、intersection

union:把两个RDD进行逻辑上的合并
intersection:求两个RDD的交集
val rdd1=sc.makeRDD(1 to 3)
val rdd2=sc.parallelize(4 until 6)
rdd1.union(rdd2)

6、sortBy和sortByKey

sortBy:手动指定排序的字段,第一个参数(_.1或者._2之类的),第二个参数可省略,表示升序,加false表示降序
sortByKey:按key进行排序,加false表示降序

7、groupByKey和reduceByKey

groupByKey:按相同的key进行分组(key,CompactBuffer(1,1,1,1))
reduceByKey:按相同的key进行分组,并且对value进行操作,假如是相加操作,reduceByKey(+)结果就是(key,4)

8、distinct:去掉重复的数据

9、coalesce、repartition、partitionBy分区函数

coalesce:有两个参数,第一个是分几个区,第二个参数是决定是否shuffle。默认为false,不可省略

1)、如果重分区的数量大于原来的数量,如果不shuffle,也就是第二个参数不为true,分区数不会发生变化,如果设置为true,增加分区会把原来的分区的数据随机分配给设置的分区中
2)、如果重分区的数量小于原来的数量,如过不shuffle,原来的分区里面的数据会整块整块的分到新的分区,如果shuffle,则会将原来的数据打乱按一定的规则重新分配给设置的分区

repartition:将数据集经过shuffle重新分为几个区,相当于coalesce(int n,true)
partitionBy:自定义分区器,重新分区
自定义分区器:

Calss MyPartition extends Partitioner{
        //分区数量为多少
        Override def numPartitions:Int=2
        //自定义分区规则
        Override def getPartition(key:Any):Int={
            If(key.HashCode()%2==0){
0
}else{
    1
}
}
}

在main函数中调用

partitionBy(new MyPartition)

Scala也有一个自有的封装的HashPartition(int n),按照hashcode值分为几个区

10、repartitionAndSortWithPartitions:重新分区并按照分区排序

Rdd. repartitionAndSortWithPartitions(new HashPartition(4))

11、glom:把分区中的元素封装到数组中

12、randomSplit:拆分RDD,将原有的RDD按照一定的比例拆分为多个分区,在数量不大的情况下,实际结果不一定准确

Val rdd=sc.parallelize(1 to 10)
Rdd. randomSplit(Array(0.1,0.2,0.3,0.4)).foreach(x => {println(x.count)})
理论结果:
1
2
3
4
Array里面的数值是多少无所谓,最后是按几个数字的比例来分配

二、多个RDD的操作

1、join、leftOuterJoin、rightOuterJoin、fullOuterJoin

Join:连接两个RDD
leftOuterJoin:左外连接
rightOuterJoin:右外连接
fullOuterJoin:全外连接
与oracle的内外连接类似

2、cogroup:将多个RDD中同一个key对应的value组合到一起

Val arr=arr1.cogroup(arr2,arr3,arr4)

3、cartesian:求笛卡尔积

假如rdd1有三个元素,rdd2有4个元素,rrd1. Cartesian(rdd2)就有12个元素

4、zip、zipWithIndex、zipWithUniqueId

Zip:两个非k,v格式的RDD,通过一一对应的格式压缩为k,v格式的RDD,要求:
    1)、分区数量需要相同
    2)、分区中的元素个数相等
    rdd1.zip(rdd2)
zipWithIndex:将RDD变成K,V格式的RDD
    K:这个RDD的元素
    V:这个元素在这个RDD中的索引
    rdd. zipWithIndex()
zipWithUniqueId:按hashCode码拉取数据
    假如rdd有两个分区,数据分别是1,2,3,4,5和6,7,8,9,10
    那么rdd. zipWithUniqueId()的结果是:
        (1,0)
        (2,2)
        (3,4)
        (4,6)
                     (5,8)
        和
        (6,1)
        (7,3)
        (8,5)
        (9,7)
        (10,9)

相关文章

网友评论

      本文标题:spark基本Transform算子

      本文链接:https://www.haomeiwen.com/subject/rnxouqtx.html