-
- RDD的创建和保存
- 1.1 textFile
从HDFS中读取一个文本文件 - 1.2 makeRDD、parallelize
都会创建一个新的ParallelCollectionRDD对象。如果makeRDD中的数据是Seq[T]结构,就会调用parallelize方法,不过makeRDD还可以将Seq[(T, Seq[String])]结构的数据转变成RDD。
makeRDD源码
parallelize源码 makeRDD和parallelize可以自己设置分区数(numSlices),默认是defaultParallelism,即Task的默认并行度
defaultParallelism源码解释
- 1.3 saveAsTextFile
将RDD保存为一个压缩文本文件。
-
- 转换算子
- 2.1 不会shuffle的
- 2.1.1 map、flatMap、mapPartitions
new一个新的MapPartitionsRDD对象,并调用Scala相应集合类的map或flatMap方法。
map和flatMap相应源码 MapPartitionsRDD类会将提供的方法作用于父RDD的每个分区。
mapPartitions类源码 相比于map或flatMap,mapPartitions多一个参数preservesPartitioning(输入true或false),判断是否保留原分区。由于MapPartitionsRDD的preservesPartitioning参数默认是false,所以map和flatMap操作后分区肯定发生了变化,而mapPartitions则可以选择保留原分区。
- 2.1.2 mapValues、flatMapValues
mapValues和flatMapValues是在PairRDDFunctions类中定义的,而map和flatMap是在RDD类中定义的,说明mapValues和flatMapValues使用场景相对更小一点,其处理的RDD形式必须是键值对。从源码来看,mapValues和flatMapValues只对键值对中的值进行处理,而且一定保证分区不变。
mapValues和flatMapValues相应源码
-
2.1.3 coalesce
对RDD进行合并分区。从源码解释上来看,coalesce是窄依赖关系,不需要shuffle(参数shuffle默认是false)。但源码中也提到,如果合并后分区数很小,很可能导致计算在较少的节点上进行,此时可以将shuffle设置为true。
coalesce源码解释
coalesce源码
接着看源码,可见coalesce具体进行过程受到shuffle参数的影响。当shuffle为false时,coalesce会创建新的CoalescedRDD类,CoalescedRDD类要求参数numPartitions大于0。coalesce主要是将父RDD合并成更少的分区,如果numPartitions大于原RDD分区数,那么新RDD分区数还是和原RDD分区数一样;如果numPartitions小于原RDD分区数,就会对partitions进行分组(group),具体实现可见PartitionCoalescer类及其throwBalls方法。
PartitionCoalescer类的throwBalls方法
- 2.1.1 map、flatMap、mapPartitions
- 2.2 会shuffle的
- 2.2.1 intersection
intersection的实现主要是利用了cogroup方法。
cogroup相应源码 cogroup会利用两个PairRDD构建新的CoGroupRDD类,CoGroupRDD类是PairRDD,key是RDD_A和RDD_B的key总集,value是一个二元组(Interable(A_value), Interable(B_value)),是A和B同一key对应的value的集合。也就是说,cogroup最后得到的结果形式是(key, (iterable(v1), interable(v2)))。让我们再回来看intersection的源码实现
intersection相应源码 intersection源码首先将输入的两个RDD转换成PairRDD形式(原RDD的值设置为key,value设置为null),然后基于这两个PairRDD构造新的CogroupRDD(构造CogroupRDD时会有shuffle),最后进行进行判断(只有interable(v1)和interable(v2)都不为空才会留下来),这样就可以得到两个RDD的交集。
不过这里还有一点有些疑惑,那就是构建CogroupRDD时是如何确定结果RDD的分区数?源码中与partition相关的代码如下,但是我不太理解,。
CoGroupRDD分区逻辑源码
- 2.2.2 subtract
subtract的实现是利用了SubtractedRDD类,调用subtractByKey方法,得到新的SubtractedRDD对象。subtract并没有用cogroup算子,而是为之新创造了一个类SubtractedRDD。原因是cogroup会将两个原RDD的key全部保留在内存中,但是SubtractedRDD中只会保留RDD_A的key,RDD_B的key会流式传过来。当RDD_A小于RDD_B时该优化可以让系统使用RDD_A的分区,而不用担心RDD_B的大小导致内存不足。源码中解释如下:
SubtractedRDD源码解释 SubtractedRDD类的计算过程如下,没太看懂,但是优化点代码中解释的比较清除——RDD_A全部存在内存中,RDD_B的key进行流式传输从而在A的key中排除B的key。这里主要是没看懂
和
。
SubtractedRDD计算过程
- 2.2.3 join、leftOuterJoin、fullOuterJoin
join类型算子都是基于cogroup算子计算的。前面已经写了,cogroup算子会将两个RDD相同的key和相应value组合在一起,最后的结果形式是(key, (iterable(v1), interable(v2)))。最后用嵌套for循环组合(iterable(v1)和(iterable(v2)就可以得到join结果。不同的join实现的不同点在于两个iterable组合时对空iterable的处理方式。 -
2.2.4 groupByKey
将RDD中相同key的元素的value全部放到一个sequence中。这个操作是很耗费资源的,因为聚集过程中并没有value的聚集,仅仅是收集起来放到一个sequence中。源码中还提到groupByKey会将所有键值对都保存在内存中,所以可能会导致OOM。
groupByKey源码解释
- 2.2.5 reduceByKey、aggregateByKey
如果是要对RDD中元素按key分组后对values进行聚合,那么就可以将groupByKey换成reduceByKey或aggregateByKey。
reduceByKey算子和aggregateByKey算子都是基于combineByKeyWithClassTag实现的
reduceByKey源码
aggregateByKey源码 可以看出相对于aggregateByKey算子,reduceByKey算子的限制更多,reduceByKey对combineByKeyWithClassTag的使用限制很多,一是输入和输出结果类型都限定为RDD元素类型;二是map端和reduce端的聚集函数都是一样的。
补充:aggregateByKey还有个优势在于combOp的输入参数类型不需要一致,也就是说,使用reduceByKey的话,map聚合后的value得和没聚合前的value类型一样,但是aggregateByKey就没这样的要求。比如,如果要同时计算用户的最大收入和最小收入,用aggregateByKey的话就可以直接对<用户, 收入>进行操作,最后得到<用户, <最大收入, 最小收入>>,但是用reduceByKey就得先将原数据形式改为<用户, <收入, 收入>>,得保证和最终想得到的格式<用户, <最大收入, 最小收入>>一样。
至于combineByKeyWithClassTag,我个人觉得就是对MapReduce中的combine函数的实现。
- 2.2.1 intersection
-
- 行动算子
-
3.1 collect
collect会调用sc.runJob将数据全部拉取到driver端存在一个Array中。
collect源码
- 3.2 count、countByKey、countByValue
count调用sc.runJob将数据拉回driver端并求和
countByKey会将原PairRDD的值变成1L,然后利用reduceByKey和collect得到结果Map
countByValue会将原RDD变成(value, null),然后基于countByKey得到结果Map
count源码
countByKey源码
countByValue源码 后续再研究
,这里挖个坑。
-
3.3 foreach、foreachPartition
先贴上源码:
foreach和foreachPartition源码
从源码中可以看出,二者最大的区别在于对传入方法f使用。前者是方法f作用于单个RDD元素,后者是方法f作用于整个RDD分区
- 3.4 first、take、top
take(num)是取回RDD的前num个元素,工作方式是先扫描一个工作分区,然后根据该分区返回结果评估是否还需要扫描其他分区以满足num数量。注意,如果RDD是"Nothing"或"Null",则会返回异常。
take源码 first就是take(1)
first源码 top(num)是返回RDD中的最大的num个元素,基于takeOrdered实现。而takeOrdered是返回RDD中的最小的num个元素。
takeOrdered源码 在takeOrdered实现中,RDD的每个分区排序存在一个iterator中,然后将所有分区的iterator拼接起来排序得到topN的数据(那如果RDD数量很大不是容易造成driver端的OOM么?看着这个算子要慎用!)
-
3.5 sortBy
调用sortByKey方法,创建一个新的排好序的ShuffledRDD。这里根据名字猜测一下,既然新的RDD是ShuffledRDD,说明该RDD的创建应该是进行了shuffle,所以最后的结果应该是全局排序的。
sortByKey源码
网友评论