前面已经给大家讲过RDD原理,今天就给大家说说RDD的转换算子有哪些,以便大家理解。
对于转换操作,RDD的所有转换都不会直接计算结果,仅记录作用于RDD上的操作,当遇到动作算子(Action)时才会进行真正计算,理解了转换算子的用途,那么接下来就不能理解哪些是转换算子了,通俗的说只要没有经过聚合或计算的算子,都可以当作转换算子。
常用转换算子
map
- 对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD
- 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应
- 输入分区与输出分区一一对应
案例1:
val a=sc.parallelize(1 to 9)
val b=a.map(x=>x*2)
a.collect
b.collect
解析:将原RDD中每个元素都乘以2来产生一个新的RDD
案例2:
val a=sc.parallelize(List("dog","tiger","pig","cat"))
val b=a.map(x=>(x,1))
b.collect
解析:map把普通RDD变成PairRDD
filter
- 对元素进行过滤,对每个元素应用指定函数,返回值为true的元素保留在新的RDD中
案例:
val a=sc.parallelize(1 to 10)
a.filter(_%2==0).collect
a.filter(_<4).collect
//map&filter
val rdd=sc.parallelize(List(1 to 6))
val mapRdd=rdd.map(_*2)
mapRdd.collect
val filterRdd=mapRdd.filter(_>5)
filterRdd.collect
mapValues
- 原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素,仅适用于PairRDD
案例:
val a=sc.parallelize(List("dog","tiger","lion","cat","panther","eagle"))
val b=a.map(x=>(x.length,x))
b.mapValues("x"+_+"x").collect
//输出结果:
Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex))
distinct
- 将当前RDD进行去重后,返回一个新的RDD
案例:
val dis = sc.parallelize(List(1,2,3,4,5,6,9,9,2,6))
dis.distinct.collect
//1,2,3,4,5,6,9
reduceByKey(func: (x, y) => v)
- 根据Key值将相同Key的元组的值用func进行计算,返回新的RDD
案例:
val a = sc.parallelize(List("dog", "salmon", "pig"), 3)
val f = a.map(x=>(x.length,x))
f.reduceByKey((a,b)=>(a+b)).collect
或
f.reduceByKey(_+_).collect
//Array[(Int, String)] = Array((6,salmon), (3,dogpig))
groupByKey(): RDD[(K, Iterable[V])]
- 将相同Key的值进行聚集,输出一个(K, Iterable[V])类型的RDD
案例:
val a = sc.parallelize(List("dog", "salmon", "pig"), 3)
val f = a.map(x=>(x.length,x))
f.groupByKey.collect
// 结果
Array[(Int, Iterable[String])] = Array((6,CompactBuffer(salmon)), (3,CompactBuffer(dog, pig)))
sortByKey
- 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
案例:
val a = sc.parallelize(List("dog", "salmon", "pig"), 3)
val f = a.map(x=>(x.length,x))
f.sortByKey().collect
// 结果
Array[(Int, String)] = Array((3,dog), (3,pig), (6,salmon))
sortBy
- 底层实现还是使用sortByKey,只不过使用自定义生成的新key进行排序,默认升序。
案例:
val a = sc.parallelize(List(10,2,3,5,13,6,3))
println(a.sortBy(x => x).collect().mkString(" ")) //升序
println(a.sortBy(x => x * (-1)).collect().mkString(" ")) //降序
// 升序 2 3 3 5 6 10 13
// 降序 13 10 6 5 3 3 2
partitionBy
- 根据设置的分区器重新将pairRDD进行分区操作,返回新的RDD。
- 如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区,
否则会生成ShuffleRDD,即会产生shuffle过程。
案例:
val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
println(rdd2.partitions.size)
combineByKey
(createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int)
- 根据key分别使用CreateCombiner和mergeValue进行相同key的数值聚集,通过mergeCombiners将各个分区最终的结果进行聚集。
aggregateByKey
(zeroValue: U, partitioner:Partitioner)(seqOp: (U, V) => U,combOp: (U, U) => U)
- 通过seqOp函数将每一个分区里面的数据和初始值迭代带入函数返回最终值,comOp将每一个分区返回的最终值根据key进行合并操作。
foldByKey
- aggregateByKey的简化操作,seqop和combop相同
flatMap
- 将函数应用于RDD中的每一项,对于每一项都产生一个集合,并将集合中的元素压扁成一个集合。
案例:
val rdd1 = sc.parallelize(List(List(1,2,3),List(4,5,6),List(7,8,9)))
val rdd2 = rdd1.flatMap(x=>x.mkString(" "))
println(rdd2.collect().mkString(" "))
//1 2 3 4 5 6 7 8 9
mapPartitions
- 将函数应用于RDD的每一个分区,每一个分区运行一次,函数需要能够接受Iterator类型,然后返回Iterator。
案例:
val a = sc.parallelize(1 to 10)
val b = a.map(x=>x*2)
val rdd2 = a.mapPartitions(x=>x.map(_*2))
println(rdd2.collect().mkString(" "))
// 2 4 6 8 10 12
mapPartitionsWithIndex
- 将函数应用于RDD中的每一个分区,每一个分区运行一次,函数能够接受 一个分区的索引值 和一个代表分区内所有数据的Iterator类型,需要返回Iterator类型。
案例:
val a = sc.parallelize(1 to 10)
val rdd2 = a.mapPartitionsWithIndex((index,items)=>(items.map(x=>(index,x))))
println(rdd2.collect().mkString(" - "))
// (0,1) - (1,2) - (1,3) - (2,4) - (2,5) - (3,6) - (4,7) - (4,8) - (5,9) - (5,10)
repartition
- 根据传入的分区数重新通过网络分区所有数据,底层调用coalesce,重型操作。
案例:
val a = sc.parallelize(1 to 10,4)
val rdd2 = a.repartition(6)
println(rdd2.partitions.size)
//coalesce 重新分区
val rdd3 = a.coalesce(6,true)
println(rdd3.partitions.size)
//rdd2: 6
//rdd3:6
repartitionAndSortWithinPartitions
- 性能要比repartition要高,在给定的partitioner内部进行排序
cogroup
- 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD,注意,如果V和W的类型相同,也不放在一块,还是单独存放。
案例:
//创建两个pairRDD
val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
val rdd1 = sc.parallelize(Array((1,4),(1,5),(3,6)))
val rdd2: RDD[(Int, (Iterable[String], Iterable[Int]))] = rdd.cogroup(rdd1)
println(rdd2.collect().mkString(" "))
//(1,(CompactBuffer(a),CompactBuffer(4, 5)))
(2,(CompactBuffer(b),CompactBuffer()))
(3,(CompactBuffer(c),CompactBuffer(6)))
coalesce
- 缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。
- coalesce常用来减少分区,第二个参数是减少分区的过程中是否产生shuffle;true为产生shuffle,false不产生shuffle。默认是false。
- 如果coalesce设置的分区数比原来的RDD的分区数还多的话,第二个参数设置为false不会起作用
- 如果设置成true,效果和repartition一样。即repartition(numPartitions) = coalesce(numPartitions,true)
glom()
- 将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
subtract
- 计算差的一种函数去除两个RDD中相同的元素,不同的RDD将保留下来。
pipe
- 对于每个分区,都执行一个perl或者shell脚本,返回输出的RDD,注意,如果你是本地文件系统中,需要将脚本放置到每个节点上。
sample
- 随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。(True,fraction,long),false抽样的数不放回循环里面,不会重复,true的话会重新进行随机抽样,而且会重复
案例:
val a = sc.parallelize(1 to 10)
a.sample(false,0.4,2).collect().foreach(println)
//1 5 6 7 8 9
union
- 将两个RDD中的元素进行合并,求并集,类型要匹配一样,相同的元素不去重,返回一个新的RDD
案例:
val a = sc.parallelize(1 to 5)
val b= sc.parallelize(3 to 7)
val unionRDD = a.union(b)
println(unionRDD.collect().mkString(" "))
// 1 2 3 4 5 3 4 5 6 7
intersection
- 将两个RDD中的元素进行合并,求交集,类型要匹配一样,取相同的元素,返回一个新的RDD
案例:
val a = sc.parallelize(1 to 5)
val b= sc.parallelize(3 to 7)
val unionRDD = a.intersection(b)
println(unionRDD.collect().mkString(" "))
//3 4 5
cartesian
- 做两个RDD的笛卡尔积,返回对偶的RDD,就是前一个集合中的每一个数和另外一个集合中的每一个数生成一个新的集合
案例:
val a = sc.parallelize(1 to 10)
val b= sc.parallelize(3 to 7)
val unionRDD = a.cartesian(b)
println(unionRDD.collect().mkString(" "))
// (1,3) (1,4) (1,5) (1,6) (1,7) (2,3) (3,3) (2,4) (3,4) (2,5) (3,5) (2,6) (3,6) (2,7) (3,7) (4,3) (5,3) (4,4) (5,4) (4,5) (5,5) (4,6) (5,6) (4,7) (5,7) (6,3) (6,4) (6,5) (6,6) (6,7) (7,3) (8,3) (7,4) (8,4) (7,5) (8,5) (7,6) (8,6) (7,7) (8,7) (9,3) (10,3) (9,4) (10,4) (9,5) (10,5) (9,6) (10,6) (9,7) (10,7)
join,leftOuterJoin,rightOuterJoin,fullOuterJoin
- 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD,但是需要注意的是,他只会返回key在两个RDD中都存在的情况。
- join后的分区数与父RDD分区数多的那一个相同。
案例:
val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
val rdd1 = sc.parallelize(Array((1,4),(1,5),(3,6)))
val rdd3 = rdd.join(rdd1)
println(rdd3.collect().mkString(" "))
//(1,(a,4)) (1,(a,5)) (3,(c,6))
rightOutJoin
- join过程中,如果找不到相同的key。则还是保留右边RDD中的数据
案例:
val rdd= sc.parallelize(Array((0,1),(0,2),(2,3)), 3)
val rdd2= sc.parallelize(Array((0,11),(0,22),(3,33)), 3)
rdd.rightOutJoin:join(rdd2).foreach(println)
//结果
(0,(Some(1),11))
(0,(Some(1),22))
(0,(Some(2),11))
(0,(Some(2),22))
(3,(None,33))
leftOutJoin
- join过程中,如果找不到相同的key。则还是保留左边RDD中的数据;
案例:
val rdd= sc.parallelize(Array((0,1),(0,2),(2,3)), 3)
val rdd2= sc.parallelize(Array((0,11),(0,22),(3,33)), 3)
rdd.leftOuterJoin(rdd2).foreach(println)
//结果
(0,(1,Some(11)))
(0,(1,Some(22)))
(0,(2,Some(11)))
(0,(2,Some(22)))
(2,(3,None))
cache
- cache 将 RDD 元素从磁盘缓存到内存。 相当于 persist(MEMORY_ONLY)
函数的功能,cache之后必须跟一个动作算子,不然cache不会启用
persist
- persist 函数对 RDD 进行缓存操作。数据缓存在哪里依据 StorageLevel 这个枚举类型进行确定。
zipWithIndex
- 该函数将RDD中的元素和这个元素在RDD中的索引号(从0开始)组合成(K,V)对
zip
- 将两个RDD中的元素(KV格式/非KV格式)一一对应相组合,变成一个新的KV格式的RDD,两个RDD的partition数量以及元素数量都必须相同,否则会抛出异常。
案例:
val rdd = sc.parallelize(Array(1,2,3,4,5))
val rdd1 = sc.parallelize(Array("A","B","C","D","E"))
var rdd2 = rdd.zip(rdd1)
println(rdd2.collect().mkString(" "))
// (1,A) (2,B) (3,C) (4,D) (5,E)
//元素数量不同,会报错
val rdd3=sc.parallelize(Array("A","B","C","D"))
var rdd4 = rdd.zip(rdd3)
println(rdd4.collect().mkString(" "))
//java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(4, 5)
常用的转换算子小编就整理到这里,后续还会更新动作算子,记得关注!