文章目录
- 简介
- Transformation转换算子
-
- value类型
-
- 1.map()
- 2.mapPartitions()
- 3.mapPartitionsWithIndex()
- 4.flatMap()
- 5.glom()
- 6.groupBy()
- 7.filter()
- 8.sample()
- 9.distinct()
- 10.coalesce()
- 11.reparation()
- 12.sortBy()
- 13.pipe()
- 双value类型
-
- 1.union()并集
- 2.intersection()交集
- 3.subtract ()差集
- 4. zip()拉链
- Key-Value类型
-
- 1.partitionBy()
- 2.reduceByKey()
- 3.groupByKey()
- 4.aggregateByKey()
- 5. foldByKey()
- 6.combineByKey()
- 7.几种ByKey的对比
- 8.sortByKey()
- 9.mapValues()
- 10. join()
- 11.cogroup()
- 2.5Action行动算子
-
- 1.reduce()
- 2.collect()
- 3.count()
- 4.first()
- 5.take()
- 6.takeOrdered()
- 7.aggregate()
- 8.fold()
- 9.countByKey()
- 10.save算子
- 11.foreach()
简介
RDD(Resilient Distributed Dataset)弹性分布式数据集
是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。
- 弹性
- 存储(内存和磁盘可以自动切换)
- 计算(RDD有计算出错重试机制)
- 容错(数据丢失可以自动恢复)
- 分区
- 分布式
- 不同分区中的数据会分配给集群中的不同服务器节点进行计算
- 数据集
- 集合不一样,RDD只封装计算逻辑,不会保存数据
- 数据抽象
- RDD是抽象类,需要子类实现
- 不可变
- RDD封装计算逻辑,是不可变的,若要改变,需要产生新的RDD,在新的RDD里面封装新的计算逻辑
- RDD封装计算逻辑,是不可变的,若要改变,需要产生新的RDD,在新的RDD里面封装新的计算逻辑
1.RDD的特性
- 1)RDD有一组分区,通过一个分区计算函数getPartitions来获取分区
- 2)分区计算函数 compute,可以把分区数据给取出来
- 3)RDD之间的依赖,通过getDependence
- 4)有分区器Partitioner(对于kv类型的RDD),可以控制分区的数据流向
- 5)数据存储优先位置 getPreferedLocation
2.RDD创建方式
- 1)通过内存集合创建
- 2)通过读取外部文件创建
- 3)通过RDD的转换算子得到新的RDD
3.创建RDD分区方式
-
通过内存集合创建
-
默认的分区方式(取决于分配给当前应用的CPU核数)
-
指定分区
// i是集合下标,length是集合长度,numSlices是分区数 def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = { (0 until numSlices).iterator.map { i => val start = ((i * length) / numSlices).toInt val end = (((i + 1) * length) / numSlices).toInt (start, end) } }
-
-
读取外部文件创建
- 默认的分区方式(取决于分配给当前应用的CPU核数与2取最小值)
- 指定分区
- FileInputFormat里面有个getSplits(切片)方法
- LineRecordReader里面有个next(按行读取)方法
4.RDD常用算子
- 转换算子Transformation
- 转换算子执行完毕后,会创建新的RDD,并不会马上执行计算
- 1.map
- 对RDD中的元素进行一个个映射
- 2.mapPartitions
- 以分区为单位,对RDD中的元素映射
- 3.mapPartitionsWithIndex
- 以分区为单位,对RDD中的元素进行映射,并且带分区编号
- 4.flatMap
- 对RDD中的元素进行扁平化处理(将整体拆分成个体)
- 5.glom
- 将RDD中每一个分区中的单个元素,转换为数组
- 6.groupBy
- 按照一定的分组规则,对RDD中的元素进行分组
- 7.filter
- 按照一定的规则,对RDD中的元素进行过滤
- 8.sample
- 参数1:是否抽样放回,如果是true,表示放回,否则不放回
- 参数2:若参数1为true,那么参数2表示期望元素出现的次数(这里只是期望,并不代表实际次数),这个数大于等于零;参数1若为false,那么参数2表示每一个元素出现的概率,范围是[0, 1]
- 参数3:随机算法的初始值(一般不用设置,若指定初始值,那么随机生成的数相等)
- takeSample(行动算子):taskSample可以指定随机抽取的个数
- 9.distinct
- 去重,底层是通过map + reduceByKey完成的去重操作
- 改变分区的两个算子
- 10.coalesce
- 一般用于缩减分区,默认情况下是不执行shuffle机制
- 11.repartition
- 一般用于扩大分区,底层调用的是coalesce,会将coalesce第二个参数设置为true,会执行shuffle
- 10.coalesce
- 12.sortBy
- 按照指定规则,对RDD中的元素进行升序,默认升序(可通过参数来设置)
- 13.pipe
- 对于RDD中的每一个分区,都会执行pipe算子中指定的脚本
- 14.Union
- 合集
- 15.intersection
- 交集
- 16subtract
- 差集
- 17.zip
- 拉链,必须要保证两个RDD的分区数以及每个分区中元素的个数一致
- 18.partitionBy
- 按照指定的分区器,通过key对RDD中的元素进行分区
- 默认分区器 HashPartitioner
- 19.reduceByKey
- 将相同的key放在一起,对Value进行聚合操作
- 20.groupByKey
- 按照key对RDD中的元素进行分组
- 21.aggregateByKey(参数1:zeroValue)(参数2:分区内计算规则, 参数3:分区间计算规则)
- 22.foldByKey(参数1:zereValue)(参数2:分区内和分区间计算规则)
- 是aggregateByKey的简化,分区内和分区间计算规则相同
- 23.combineByKey(参数1:对当前key的value进行转换, 参数2:分区内计算规则, 参数3:分区间计算规则)
- 24.sortByKey
- 按照RDD中的key对元素进行排序
- 25.mapValues
- 只对RDD中的Value进行操作
- join&cogroup
- 行动算子Action
- 行动算子执行后,才会触发计算
- 1.reduce
- 对RDD中的元素进行聚合
- 2.collect.foreach和foreach
- 对RDD中的元素进行遍历
- collect.foreach将每一个Excutor中的数据收集到Driver,形成一个新的数组
- .foreach不是一个算子,是集合的方法,是对数组中的元素进行遍历
- 3.count
- 获取RDD中元素的个数
- 4.countByKey
- 获取RDD中每个key对应的元素个数
- 5.first
- 获取RDD中第一个元素
- 6.take
- 获取RDD中的前几个元素
- 7.takeOrdered
- 获取排序后的RDD中的前几个元素
- 8.aggregate&fold
- aggregateByKey 处理kv类型的RDD,并且在进行分区间聚合的时候,初始值不参与运算
- aggregate处理kv类型的RDD,但是在分区间和分区内聚合时初始值都会参与运算
- fold 是aggregate的简化版
- 9.save相关的算子
- saveAsTextFile
- saveAsObjectFile
- saveAsSequenceFile(只针对KV类型RDD)
Transformation转换算子
当我们去调用的时候,会创建一个新的RDD,并不会真正执行计算操作,只是做一些计算逻辑的封装
- RDD整体上分为:
- Value类型
- 双Value类型
- Key-Value类型
Action行动算子
只有行动算子被调用之后才会去做计算
行动算子是触发了整个作业的执行。因为转换算子都是懒加载,并不会立即执行。
Transformation转换算子
value类型
1.map()
-
map(fun):返回一个新的RDD,该RDD由每一个输入元素经过fun函数转换后组成.
-
当某个RDD执行map方法时,会遍历RDD中的每一个数据项,并依次应用fun函数,产生一个新的RDD
-
新的RDD分区数和旧的RDD分区数相同
-
代码示例
def main(args: Array[String]): Unit = { // 创建SparkConf并设置App名称 val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]") // 创建SparkContext,该对象是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2) println("原分区个数" + rdd.partitions.size) val newRDD: RDD[Int] = rdd.map(_ * 2) println("新分区个数" + newRDD.partitions.size) newRDD.collect().foreach(println) // 关闭连接 sc.stop() } ----------------------------------------------------------- 输出结果: 原分区个数2 新分区个数2 2 4 6 8
2.mapPartitions()
-
以分区为单位执行Map算子
-
类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。
-
假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。
-
参数是一个可迭代的集合
-
map是一次处理一个元素,mapPartitions是一次处理一个分区的元素,mapPartitions适用于批处理操作
-
代码示例(对RDD中的元素进行乘2操作)
def main(args: Array[String]): Unit = { // 创建SparkConf并设置App名称 val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]") // 创建SparkContext,该对象是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2) // 以分区为单位,对RDD中的元素进行映射 val newRDD: RDD[Int] = rdd.mapPartitions(datas => { datas.map(_ * 2) }) newRDD.collect().foreach(println) // 关闭连接 sc.stop() } --------------------------------------- 输出结果 2 4 6 8
适用于批处理操作
比如将RDD中的元素插入到数据库中,需要数据库连接,如果每一个元素都创建一个连接,效率很低,可以对每个分区的元素创建一个连接mapPartitions每次处理一批数据,这个分区中数据处理完之后,原来的RDD中分区的数据才会释放,可能会导致内存溢出
3.mapPartitionsWithIndex()
两个参数,一个是当前分区的编号,另外一个是分区的数据(可以单独对某一个分区进行操作)
-
代码示例
-
需求1
-
对RDD元素做映射,将元素变为元组,元组里面分别表示分区编号和当前元素
def main(args: Array[String]): Unit = { // 创建SparkConf并设置App名称 val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]") // 创建SparkContext,该对象是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2) val newRDD: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex( (index, datas) => { datas.map((index, _)) } ) newRDD.collect().foreach(println) // 关闭连接 sc.stop() } ----------------------------------------------------------------- 输出结果 (0,1) (0,2) (1,3) (1,4)
-
需求2
-
第二个分区乘2,其他分区不变
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8), 3) val newRDD: RDD[Int] = rdd.mapPartitionsWithIndex( (index, datas) => { index match { case 1 => datas.map(_ * 2) case _ => datas } } ) newRDD.collect().foreach(println) -------------------------------------- 输出结果 1 2 6 8 10 6 7 8
-
4.flatMap()
功能与map相似,将RDD中的每一个元素通过应用f函数依次转换为新函数,并封装到RDD中
跟map的区别在于:
- flatMap中f函数返回值是一个集合,将RDD集合中每一个元素拆分出来放到新的RDD当中
代码示例
-
需求
-
对集合进行扁平化处理,将一个大的集合(里面有子集合)里面数据取出,放入一个大的集合当中
def main(args: Array[String]): Unit = { // 创建SparkConf并设置App名称 val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]") // 创建SparkContext,该对象是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) val rdd: RDD[List[Int]] = sc.makeRDD(List(List(1, 2), List(3, 4), List(5, 6), List(7, 8)), 2) // 如果匿名函数输入和输出相同,那么不能进行简化 val newRDD: RDD[Int] = rdd.flatMap(datas => datas) newRDD.collect().foreach(println) // 关闭连接 sc.stop() } ---------------------------------------------- 1 2 3 4 5 6 7 8
-
5.glom()
def glom(): RDD[Array[T]]
将RDD中每一个分区中元素组合成一个数组,封装到新的RDD当中,数组中元素的类型保持不变
(flatMap是将整体转换成个体,glom是将个体转换成整体)
代码示例
-
需求
-
将RDD中每个分区里的数据放到数组当中
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) var rdd: RDD[Int] =sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2) println("----------没有进行glom之前--------------") rdd.mapPartitionsWithIndex( (index, datas) => { println("当前分区:" + index + " " + "元素:"+ datas.mkString(",")) datas } ).collect() println("----------调用glom之后--------------") val newRDD: RDD[Array[Int]] = rdd.glom() newRDD.mapPartitionsWithIndex( (index, datas) => { println("当前分区:" + index + " " + "元素:"+ datas.next().mkString(",")) datas } ).collect() sc.stop() } ------------------------------------------- 输出结果 ----------没有进行glom之前-------------- 当前分区:1 元素:4,5,6 当前分区:0 元素:1,2,3 ----------调用glom之后-------------- 当前分区:0 元素:1,2,3 当前分区:1 元素:4,5,6
-
6.groupBy()
def groupBy[K](f T => K) (implicit kt: ClassTag[K]): RDD[(k, Iterable[T])]
分组,按照传入函数的返回值进行分组,相同key对应的值放入一个迭代器(按照指定的规则,对RDD中的元素进行分组)
groupBy会存在shuffle的过程进行落盘
代码示例
-
需求
-
将RDD中奇数放在一组,偶数放到另一组
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) val rdd: RDD[Int] =sc.makeRDD(1 to 9, 3) println("-------------执行分组前----------------") rdd.mapPartitionsWithIndex( (index, datas) => { println(s"分区号:$index " + "元素:" + datas.mkString(",")) datas } ).collect() val newRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2) println("-------------执行分组后----------------") newRDD.mapPartitionsWithIndex( (index, datas) => { println(s"分区号:$index " + "元素:" + datas.mkString(",")) datas } ).collect() sc.stop() } -------------------------------------------- 输出结果 -------------执行分组前---------------- 分区号:0 元素:1,2,3 分区号:2 元素:7,8,9 分区号:1 元素:4,5,6 -------------执行分组后---------------- 分区号:2 元素: 分区号:1 元素:(1,CompactBuffer(1, 3, 5, 7, 9)) 分区号:0 元素:(0,CompactBuffer(2, 4, 6, 8))
-
给出一个RDD,将相同的单词放到一组
val rdd: RDD[String] = sc.makeRDD(List("Peanut", "Hadoop", "Peanut", "Hadoop", "Hive")) val newRDD: RDD[(String, Iterable[String])] = rdd.groupBy(elem => elem) newRDD.collect().foreach(println) ------------------------------------------- 输出结果 (Hive,CompactBuffer(Hive)) (Peanut,CompactBuffer(Peanut, Peanut)) (Hadoop,CompactBuffer(Hadoop, Hadoop))
-
用以上的算子来实现一个简单版的WordCount
List("Peanut", "Hadoop", "Hadoop", "Spark", "Spark", "Spark")
首先从集合中读取数据,用flatMap对RDD中的数据进行扁平化处理,通过map来对每一个单词设置为元组,每个单词对应的数为1,使用groupBy将相同的key放到一组,对相同的单词进行累加
-
实现方式1
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) val rdd: RDD[String] = sc.makeRDD(List("Peanut", "Hadoop", "Hadoop", "Spark", "Spark", "Spark")) // 对RDD中的元素进行扁平化映射 val flatMap: RDD[String] = rdd.flatMap(_.split(" ")) // 将映射后的数据进行结构转换,为每个单词进行计数 val mapRDD: RDD[(String, Int)] = flatMap.map((_, 1)) // 按照key对RDD中的元素进行分组 val groupByRDD: RDD[(String, Iterable[(String, Int)])] = mapRDD.groupBy(_._1) // 对分组后的元素再进行映射 val resRDD: RDD[(String, Int)] = groupByRDD.map({ case (word, datas) => { (word, datas.size) } }) resRDD.collect().foreach(println) sc.stop() } ------------------------------------------------ 输出结果 (Peanut,1) (Spark,3) (Hadoop,2)
-
实现方式2
- 其实在方式1当中,进行扁平化处理之后,不需要再将结果映射成map,可以直接对结果按照key进行groupBy,最后用map统计相同key的长度即为结果
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) val rdd: RDD[String] = sc.makeRDD(List("Peanut", "Hadoop", "Hadoop", "Spark", "Spark", "Spark")) // 对RDD中的元素进行扁平化映射 val flatMap: RDD[String] = rdd.flatMap(_.split(" ")) // 将RDD中的单词进行分组 val groupByRDD: RDD[(String, Iterable[String])] = flatMap.groupBy(word => word) // 对分组后的元素再进行映射 val resRDD: RDD[(String, Int)] = groupByRDD.map({ case (word, datas) => { (word, datas.size) } }) resRDD.collect().foreach(println) sc.stop() } ------------------------------------------------ 输出结果 (Peanut,1) (Spark,3) (Hadoop,2)
用以上的算子来实现一个复杂版的WordCount
数字代表出现的次数
List(("Peanut Spark", 2), ("Hello Spark", 3), ("Hello World", 2)
- 方式1
- 先用map对其进行字符串拼接
- 再进行扁平化处理,对结果按照key进行groupBy,最后用map统计相同key的长度即为结果
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("Peanut Spark", 2), ("Hello Spark", 3), ("Hello World", 2)))
// 将原来RDD中字符串以及字符串出现的次数进行处理,形成一个新的字符串
val rdd1: RDD[String] = rdd.map{
case (str, count) => {
(str + " ") * count
}
}
println("输出当前rdd1")
rdd1.collect().foreach(println)
println("-------------------------------------")
// 对RDD中的元素进行扁平化映射
val flatMap: RDD[String] = rdd1.flatMap(_.split(" "))
// 将RDD中的单词进行分组
val groupByRDD: RDD[(String, Iterable[String])] = flatMap.groupBy(word => word)
// 对分组后的元素再进行映射
val resRDD: RDD[(String, Int)] = groupByRDD.map({
case (word, datas) => {
(word, datas.size)
}
})
resRDD.collect().foreach(println)
sc.stop()
}
-------------------------------------------------------------------
输出结果
输出当前rdd1
Peanut Spark Peanut Spark
Hello Spark Hello Spark Hello Spark
Hello World Hello World
-------------------------------------
(Peanut,2)
(Hello,5)
(World,2)
(Spark,5)
- 方式2
- 将元素转换成元组,然后进行分组统计
- (“Peanut Spark”, 2) ----> (“Peanut”, 2) (“Spark”, 2)
def main(args: Array[String]): Unit = {
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("Peanut Spark", 2), ("Hello Spark", 3), ("Hello World", 2)))
val flatMapRDD: RDD[(String, Int)] = rdd.flatMap {
case (words, count) => {
// 对多个单词组成的字符串进行切割
words.split(" ").map((_, count))
}
}
// 按照单词对RDD中的元素进行分组
val groupByRDD: RDD[(String, Iterable[(String, Int)])] = flatMapRDD.groupBy(_._1)
val resRDD: RDD[(String, Int)] = groupByRDD.map{
case (word, datas) =>
(word, datas.map(_._2).sum)
}
resRDD.collect().foreach(println)
}
--------------------------------------------------------------------------------
输出结果
(Peanut,2)
(Hello,5)
(World,2)
(Spark,5)
7.filter()
def filter(f T=> Boolean): RDD[T]
RDD调用filter时,对RDD中的每一个元素应用f函数,将返回值为true的元素添加到新的RDD当中
按照指定的过滤规则,对RDD中的元素进行过滤
代码示例
-
需求1
List("Peanut", "zhangsan", "lisi", "wangwu", "xiaoliu", "xiaowang")
- 将集合中包含"xiao"的字符串提取出来
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) val rdd: RDD[String] = sc.makeRDD(List("Peanut", "zhangsan", "lisi", "wangwu", "xiaoliu", "xiaowang")) val newRDD: RDD[String] = rdd.filter(_.contains("xiao")) newRDD.collect().foreach(println) sc.stop() } ------------------------------------------- 输出结果 xiaoliu xiaowang
-
需求2
List(1, 2, 3, 4, 5, 6, 7, 8, 9)
- 将所有的偶数过滤出来
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9)) // 把所有的奇数过滤出来 val newRDD: RDD[Int] = rdd.filter(_ % 2 == 0) newRDD.collect().foreach(println) sc.stop() } ------------------------------------------------------- 输出结果 2 4 6 8
8.sample()
随机抽样
def sample(
withReplacement: Boolean, // true为有放回的抽样,false为无放回抽样
fraction: Double, // 当withReplacement = false时,表示每个元素的概率,范围是[0,1]
// 当withReplacement = true时,表示每个元素的期望次数,范围是 >= 0
seed: Long = Untils.random.nextLong // send表示随机数生成器种子(生成随机数的初始值,种子一样时,生成的随机数相同)抽样算法的初始值,一般不需要指定
): RDD[T]
代码示例
- 需求1
- 抽样放回
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(1 to 10)
val newRDD: RDD[Int] = rdd.sample(true, 1)
newRDD.collect().foreach(println)
sc.stop()
}
-----------------------------------------------------
输出结果(每次执行的结果都不同)
1
2
2
2
2
5
5
6
6
6
7
10
10
10
最后的结果是随机的,第二个参数设置期望出现次数为1,只是期望,并不代表每个数字都出现一次。
- 需求2
- 抽样不放回
- 那么此时第二个参数代表的就是每个数出现的概率,如果设为1,则每个数都出现一次
- 若为0,则都不出现
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(1 to 10)
// 从RDD中随机抽取数据(抽样不放回)
val newRDD: RDD[Int] = rdd.sample(false, 0.3)
newRDD.collect().foreach(println)
sc.stop()
}
---------------------------------------------
3
5
7
takeSample可以在抽取时指定抽取的个数(在第二个参数中指定抽取的个数)
- 小案例
- 从名单中抽取三名幸运观众
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val stds = List("张1", "张2", "张3", "张4", "张5", "张6", "张7", "张8", "张9",
"张10", "张11", "张12", "张13", "张14", "张15", "张16",
"张17", "张18", "张19", "张21", "张20", "张22")
val nameRDD: RDD[String] = sc.makeRDD(stds)
// 从上面RDD当中抽取一名幸运观众
val res: Array[String] = nameRDD.takeSample(false, 3)
res.foreach(println)
sc.stop()
}
------------------------------------------
输出结果
张18
张13
张1
9.distinct()
去重算子
def distinct(): RDD[T]
distinct算子是对内部元素进行去重,将去重后的结果放到新的RDD当中
默认情况下,distinct会生成与原分区个数一致的分区数
distinct有两种,一种是不加参数,一种是可以指定分区数
代码示例
- 需求1
- 对集合中的数去重,不指定分区数
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 创建RDD
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 5, 4, 4, 3, 3), 5)
// 对RDD中数据进行去重
val newRDD: RDD[Int] = rdd.distinct()
newRDD.collect().foreach(println)
sc.stop()
}
-------------------------------------
输出结果
5
1
2
3
4
- 需求2
- 对集合中的数据去重,指定分区数
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 创建RDD
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 5, 4, 4, 3, 3), 5)
rdd.mapPartitionsWithIndex{
(index, datas) =>
println("分区:"+ index + "---> 分区元素:" + datas.mkString(","))
datas
}.collect()
println("------------去重前后分区对比----------------")
// 对RDD中数据进行去重
val newRDD: RDD[Int] = rdd.distinct(2)
newRDD.mapPartitionsWithIndex{
(index, datas) =>
println("分区:"+ index + "---> 分区元素:" + datas.mkString(","))
datas
}.collect()
sc.stop()
}
--------------------------------------------------------
输出结果
分区:4---> 分区元素:3,3
分区:1---> 分区元素:3,4
分区:0---> 分区元素:1,2
分区:2---> 分区元素:5,5
分区:3---> 分区元素:4,4
------------去重前后分区对比----------------
分区:1---> 分区元素:1,3,5
分区:0---> 分区元素:4,2
10.coalesce()
coalesce算子可以重新分区
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]
代码示例
- 需求1
- 缩减分区
- 默认情况下,如果使用coalesce扩大分区是不起作用的,因为底层没有执行shuffle,如果扩大分区,使用reparation
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 创建RDD
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
rdd.mapPartitionsWithIndex{
(index, datas) =>
println("分区:"+ index + "---> 分区元素:" + datas.mkString(","))
datas
}.collect()
println("*****************缩减分区前后********************")
// 缩减分区
val newRDD: RDD[Int] = rdd.coalesce(2)
newRDD.mapPartitionsWithIndex{
(index, datas) =>
println("分区:"+ index + "---> 分区元素:" + datas.mkString(","))
datas
}.collect()
sc.stop()
}
----------------------------------------------------------
分区:0---> 分区元素:1,2
分区:2---> 分区元素:5,6
分区:1---> 分区元素:3,4
***************缩减分区前后**********************
分区:0---> 分区元素:1,2
分区:1---> 分区元素:3,4,5,6
11.reparation()
重新分区,扩大分区时,可以使用reparation,reparation会执行shuffle
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 创建RDD
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
rdd.mapPartitionsWithIndex{
(index, datas) =>
println("分区:"+ index + "---> 分区元素:" + datas.mkString(","))
datas
}.collect()
// 使用reparation扩大分区
val newRDD: RDD[Int] = rdd.repartition(4)
println("******************扩大分区前后*******************")
newRDD.mapPartitionsWithIndex{
(index, datas) =>
println("分区:"+ index + "---> 分区元素:" + datas.mkString(","))
datas
}.collect()
sc.stop()
}
-----------------------------------
输出结果
分区:2---> 分区元素:5,6
分区:1---> 分区元素:3,4
分区:0---> 分区元素:1,2
******************扩大分区前后*******************
分区:2---> 分区元素:
分区:1---> 分区元素:
分区:3---> 分区元素:1,3,5
分区:0---> 分区元素:2,4,6
repartition的底层其实是调用的coalesce,只不过在调用时,第二个参数指定执行shuffle机制
请看源码:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
- coalesce
- 第二个参数默认为false,底层默认不执行shuffle
- 一般用于缩减分区
- repartition
- 底层调用coalesce,调用coalesce时第二个参数为true,执行shuffle
- 一般用于扩大分区
12.sortBy()
排序
对RDD中的元素进行排序,排序时需要指定排序规则
def sortBy[K](
f: (T) => K,
ascending: Boolean = true, // 默认为正序排序
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
代码示例
- 需求1
- 升序排序
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 创建RDD
val rdd: RDD[Int] = sc.makeRDD(List(1, 4, 6, 5, 3, 2))
// 升序排序
val sortedRDD: RDD[Int] = rdd.sortBy(num => num)
sortedRDD.collect().foreach(println)
sc.stop()
}
-------------------------------------------------------
输出结果
1
2
3
4
5
6
- 需求2
- 降序排序(sortBy的第二个参数需要指定为false)
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 创建RDD
val rdd: RDD[Int] = sc.makeRDD(List(1, 4, 6, 5, 3, 2))
// 降序排序
val sortedRDD: RDD[Int] = rdd.sortBy(num => num, false)
sortedRDD.collect().foreach(println)
sc.stop()
}
-----------------------------------------------------------------
输出结果
6
5
4
3
2
1
- 需求3
- 对字符串进行排序
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val strRDD: RDD[String] = sc.makeRDD(List("1", "6", "2", "10"))
println("-----------对字符串按字典序排序-------------")
// 按照字符串字典顺序进行排序
val sortedRDD: RDD[String] = strRDD.sortBy(elem => elem)
sortedRDD.collect().foreach(println)
println("------------将字符串按照字面的数字来进行排序---------------")
// 按照字符串转换为整数后的大小进行排序
val sortedRDD2: RDD[String] = strRDD.sortBy(_.toInt)
sortedRDD2.collect().foreach(println)
sc.stop()
}
-----------------------------------------
输出结果
-----------对字符串按字典序排序-------------
1
10
2
6
------------将字符串按照字面的数字来进行排序---------------
1
2
6
10
13.pipe()
def pipe(command: String): RDD[String]
针对每一个分区,都调用一次shell脚本,返回输出的RDD
双value类型
1.union()并集
def union(other: RDD[T]): RDD[T]
代码示例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6, 7))
// 求并集
val newRDD: RDD[Int] = rdd1.union(rdd2)
newRDD.collect().foreach(println)
sc.stop()
}
-------------------------------------
1
2
3
4
3
4
5
6
7
2.intersection()交集
def intersection(other: RDD[T]): RDD[T]
代码示例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6, 7))
// 求交集
val newRDD: RDD[Int] = rdd1.intersection(rdd2)
newRDD.collect().foreach(println)
sc.stop()
}
---------------------
输出结果
3
4
3.subtract ()差集
def subtract(other: RDD[T]): RDD[T]
代码示例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6, 7))
// 求差集
val newRDD: RDD[Int] = rdd1.subtract(rdd2)
newRDD.collect().foreach(println)
sc.stop()
}
--------------------------------------------
输出结果
1
2
4. zip()拉链
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
拉链的功能是将两个RDD中的元素合并成元组,元组中key为第一个RDD中的元素,value为第二个RDD中的元素
要求两个RDD的分区数量以及元素数量都相等,否则会抛异常
代码示例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6))
// 拉链
val newRDD: RDD[(Int, Int)] = rdd1.zip(rdd2)
newRDD.collect().foreach(println)
sc.stop()
}
----------------------------------------
输出结果
(1,3)
(2,4)
(3,5)
(4,6)
Key-Value类型
1.partitionBy()
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
按照key从新分区,将RDD[K, V]中的K,按照指定Partitioner重新进行分区如果原有的partitionRDD和现有partitionRDD是一致的话,就不进行分区,否则会产生shuffe过程。
代码示例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 创建RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "b"), (3, "c")), 3)
rdd.mapPartitionsWithIndex(
(index, datas) =>{
println("分区号:" + index + "---->" + "分区数据" + datas.mkString(","))
datas
}
).collect()
println("**************分区前后对比*****************")
val newRDD: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))
newRDD.mapPartitionsWithIndex(
(index, datas) =>{
println("分区号:" + index + "---->" + "分区数据" + datas.mkString(","))
datas
}
).collect()
sc.stop()
}
----------------------------------------
输出结果
分区号:0---->分区数据(1,a)
分区号:2---->分区数据(3,c)
分区号:1---->分区数据(2,b)
**************分区前后对比*****************
分区号:1---->分区数据(1,a),(3,c)
分区号:0---->分区数据(2,b)
自定义分区器
class MyPartitioner(partitions: Int) extends Partitioner {
// 获取分区个数
override def numPartitions: Int = partitions
// 指定分区规则 返回值Int表示分区编号,从0开始
override def getPartition(key: Any): Int = {
// 此时返回为1,那么所有的元素都会被分到1号区
1
}
}
------------------------------------------------------------------------------
将val newRDD: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))中new HashPartitioner(2)换成new MyPartitioner(2)
输出结果
分区号:1---->分区数据(2,b)
分区号:0---->分区数据(1,a)
分区号:2---->分区数据(3,c)
**************分区前后对比*****************
分区号:0---->分区数据
分区号:1---->分区数据(1,a),(2,b),(3,c)
2.reduceByKey()
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
reduceByKey可以将RDD[K, V]中的元素按照相同的K对V进行聚合
可以指定新的RDD中分区器或分区数
代码示例
- 将集合中相同key的元组进行聚合
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 4), ("b", 3)))
val resRDD: RDD[(String, Int)] = rdd.reduceByKey(_ + _)
resRDD.collect().foreach(println)
sc.stop()
}
--------------------------------------------------------------
输出结果
(a,5)
(b,5)
3.groupByKey()
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
groupByKey对集合中的每个key进行操作,会将相同key对应的value放到一个集合当中,不进行聚合
可以指定新的RDD中分区器(默认使用的分区器是HashPartitioner)或分区数
代码示例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 5), ("b", 2), ("a", 4), ("b", 3)))
val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
groupRDD.collect().foreach(println)
sc.stop()
}
---------------------------------
输出结果
(a,CompactBuffer(5, 4))
(b,CompactBuffer(2, 3))
- wordcount案例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 5), ("b", 2), ("a", 4), ("b", 3)))
val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
val resRDD = groupRDD.map {
case (key, datas) => {
(key, datas.sum)
}
}
resRDD.collect().foreach(println)
sc.stop()
}
---------------------------------------------------------------
输出结果
(a,9)
(b,5)
- reduceByKey用来做聚合
- groupByKey用来做分组
4.aggregateByKey()
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]
aggregateByKey是按照key对分区内以及分区间的数据进行处理
aggregateByKey(参数1)(参数2, 参数3)
-
参数1: 初始值
-
参数2: 分区内计算规则
-
参数3: 分区间计算规则
代码示例
- 需求1
- 用aggregateByKey实现wordCount
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 创建RDD
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("c", 5), ("b", 5), ("c", 1), ("b", 2)), 2)
// aggregateByKey实现wordCount
val newRDD: RDD[(String, Int)] = rdd.aggregateByKey(0)(_ + _, _ + _)
newRDD.collect().foreach(println)
sc.stop()
}
----------------------------------------------
输出结果
(b,7)
(a,6)
(c,6)
- 需求2
- 对每个分区内的最大值求和
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 创建RDD
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("c", 5), ("a", 2), ("a", 3), ("b", 5), ("a", 3), ("c", 3), ("b", 2)), 2)
rdd.mapPartitionsWithIndex(
(index, datas) => {
println("分区:" + index + "---> 分区数据:" + datas.mkString(","))
datas
}
).collect()
println("求分区最大值")
val newRDD: RDD[(String, Int)] = rdd.aggregateByKey(0)(
// 分区内的计算规则,找出最大值,max在进行比较时,首先是将第一个元素跟初始值比较
(x, y) => math.max(x, y),
// 分区间计算规则
(a, b) => a + b
)
newRDD.collect().foreach(println)
sc.stop()
}
-----------------------------------------------------
输出结果
分区:0---> 分区数据:(c,5),(a,2),(a,3)
分区:1---> 分区数据:(b,5),(a,3),(c,3),(b,2)
求分区最大值
(b,5)
(a,6)
(c,8)
当分区内计算逻辑和分区间计算逻辑不一样时,可以考虑使用aggregateByKey
5. foldByKey()
def foldByKey(
zeroValue: V,
partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
- 参数含义
- zeroValue代表初始值,可以取任意类型
- func 代表一个函数
- Partitioner 分区器
- numPartitions 分区数
foldByKey是aggregateByKey的简化版本,如果分区内和分区间计算逻辑相同,那么可以使用foldByKey,foldByKey和reduceByKey之间的差别在于foldKey可以指定初始值
代码示例
- 需求
- 按照key对分区内和分区间的数据进行相加(用foldByKey实现wordCount)
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 创建RDD
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("c", 2), ("a", 1), ("a", 2), ("b", 5), ("a", 3), ("c", 3), ("b", 2)), 2)
val newRDD: RDD[(String, Int)] = rdd.foldByKey(0)(_ + _)
newRDD.collect().foreach(println)
sc.stop()
}
------------------------------------------------------
(b,7)
(a,6)
(c,5)
6.combineByKey()
转换结构后分区内和分区间操作
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int): RDD[(K, C)]
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)]
参数说明:
-
createCombiner
- 对RDD中当前key取出第一个value做初始化,把当前的值作为参数,对该值可以进行一些转换操作,转换成我们想要的格式,比如读进来(10),可以转换为(10, 1)
-
mergeValue
- 分区内的计算规则,会将当前分区的value值(除去第一个value剩下的value),合并到初始化得到的c上面
-
mergeCombiners
- 分区间的计算规则
combineByKey算子的使用场景
代码示例
List(("LaoWang", 90), ("LaoLiu", 80), ("LaoWang", 85), ("LaoLiu", 60)
List中表示的是学生姓名和成绩,一个学生有不同科目的成绩
- 需要求出每个学生的平均成绩
- 先对读进来的数据进行溢写格式转换,比如将"LaoWang"对应的90转换为(90, 1),加上一个1是为了后面方便计算每个同学总共的科目数量
- 然后在分区内对分数相加,每加一次,课程门数加1
- 分区间,将分数以及课程门数分别相加
- 得到每个同学的总成绩以及课程数量之后,通过map来求平均值
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 需求,求出每一个学生的平均成绩
// 创建RDD
val scoreRDD: RDD[(String, Int)] = sc.makeRDD(List(("LaoWang", 90), ("LaoLiu", 80), ("LaoWang", 85), ("LaoLiu", 60)), 2)
val combineRDD: RDD[(String, (Int, Int))] = scoreRDD.combineByKey(
// 初始化
(_, 1),
// 分区内计算规则 将分数相加,每加一次,课程门数加1
(t1: (Int, Int), v) => {
(t1._1 + v, t1._2 + 1)
},
// 分区间计算规则,将分数以及课程门数分别相加
(tup2: (Int, Int), tup3: (Int, Int)) => {
(tup2._1 + tup3._1, tup2._2 + tup2._2)
}
)
// 求平均成绩
val resRDD: RDD[(String, Int)] = combineRDD.map {
case (name, (score, count)) => {
(name, score / count)
}
}
resRDD.collect().foreach(println)
sc.stop()
}
-------------------------------------------------------------------------------------
输出结果
(LaoLiu,70)
(LaoWang,87)
7.几种ByKey的对比
- 如果分区内和分区间计算逻辑相同,并且不需要指定初始值,那么优先使用reduceByKey
- 如果分区内和分区间计算逻辑相同,并且需要指定初始值,那么优先使用foldByKey
- 如果分区内和分区间计算逻辑不相同,并且需要指定初始值,那么优先使用aggregateByKey
- 需要对读入的RDD中数据进行格式转换时,并且要处理分区内和分区间的逻辑,那么优先使用combineByKey
8.sortByKey()
按照K进行排序
在一个(K, V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)]
参数说明
- 第一个参数代表排序规则,true为默认,默认为升序
- 第二个参数为分区数,默认跟当前情况下分区数相同
代码示例
- 按照key升序排序
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 创建RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((2, "aa"), (6, "dd"), (4, "bb"), (1, "cc")))
// 按照key对RDD中的元素进行排序 Int已经默认实现了Ordered接口
val newRDD: RDD[(Int, String)] = rdd.sortByKey()
newRDD.collect().foreach(println)
sc.stop()
}
-------------------------------------------------------------
(1,cc)
(2,aa)
(4,bb)
(6,dd)
如果想降序排序,只需在方法里设置第一个参数为false即可
val newRDD: RDD[(Int, String)] = rdd.sortByKey(false)
- 如果key为自定义类型,要求必须混入Ordered特质
- 自定义一个Student类,要求先按照名称升序,如果名称相同的话,再按照年龄降序
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val stdList: List[(Student, Int)] = List(
(new Student("zhangsan", 18), 1),
(new Student("lisi", 12), 1),
(new Student("wangwu", 20), 1),
(new Student("wangwu", 23), 1)
)
val stdRDD: RDD[(Student, Int)] = sc.makeRDD(stdList)
val resRDD: RDD[(Student, Int)] = stdRDD.sortByKey()
resRDD.collect().foreach(println)
sc.stop()
}
-----------------------------------------------------------------------------
输出结果
(Student(lisi, 12) ,1)
(Student(wangwu, 23) ,1)
(Student(wangwu, 20) ,1)
(Student(zhangsan, 18) ,1)
Student类定义如下
class Student(var name: String, var age: Int) extends Ordered[Student] with Serializable {
// 指定比较规则
override def compare(that: Student): Int = {
// 先按照名称升序,如果名称相同的话,再按照年龄降序
var res: Int = this.name.compareTo(that.name)
if (res == 0) {
// res 等于0 说明名字相同,此时按照年龄降序排序
res = that.age - this.age
}
res
}
override def toString: String = s"Student($name, $age) "
}
9.mapValues()
def mapValues[U](f: V => U): RDD[(K, U)]
对kv类型的RDD中的value部分进行映射
代码示例
- 需求
- 对RDD中的value添加字符串" --> "
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "c"), (1, "b"), (3, "d")))
val newRDD: RDD[(Int, String)] = rdd.mapValues(" --> " + _)
newRDD.collect().foreach(println)
sc.stop()
}
----------------------------------------------------
输出结果
(1, --> a)
(2, --> c)
(1, --> b)
(3, --> d)
10. join()
我们先来回顾一下SQL中的连接
- 连接
- 两张表或者多张表结合在一起获取数据的过程
- 内连接
- 两张表进行连接查询,将两张表中完全匹配的记录查询出来
- 等值连接(某个属性值相等)
- =
- 非等值连接(确定一个范围)
- between and …
- 自连接(本张表和本张表连接)
- 外连接
- 两张表进行连接查询,将其中一张表的数据全部查询出来,如果另外一张表中有数据无法与其匹配,则会自动模拟出空值与其进行匹配
- 左(外)连接
- 左连接是左边表的所有数据都有显示出来,右边的表数据只显示共同有的那部分,没有对应的部分补空显示
- 右(外)连接
- 右连接和左连接相反的
- 全连接
- 返回两个表中的所有的值,没有对应的数据则输出为空
- 返回两个表中的所有的值,没有对应的数据则输出为空
join()是将相同key对应的多个value关联在一起
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
对于两个RDD ,类型分别为(k, v)和(k, w),调用join算子时,返回一个相同key对应的所有元素组合在一起的RDD(k, (v, w)),简单点来说,就是k不变,将value从新组合放到一起
如果key只是某一个RDD中有,那么这个key不会关联,类似于内连接,将两张表完全匹配的数据给显示出来
代码示例
- 将两个RDD进行join
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "c"), (1, "b"), (3, "d"), (4, "e")))
val rdd2: RDD[(Int, Int)] = sc.makeRDD(List((1, 3), (2, 5), (1, 2), (3, 3)))
// join算子相当于内连接,将两个RDD中的key相同的数据匹配,如果key匹配不上,那么数据不关联
val newRDD: RDD[(Int, (String, Int))] = rdd.join(rdd2)
newRDD.collect().foreach(println)
sc.stop()
}
---------------------------------------------------------------------------
输出结果
(1,(a,3))
(1,(a,2))
(1,(b,3))
(1,(b,2))
(2,(c,5))
(3,(d,3))
- 左外连接 leftOuterJoin(对于没有匹配上的,会用空值进行匹配)
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "c"), (1, "b"), (3, "d"), (4, "e")))
val rdd2: RDD[(Int, Int)] = sc.makeRDD(List((1, 3), (2, 5), (1, 2), (3, 3)))
val newRDD: RDD[(Int, (String, Option[Int]))] = rdd.leftOuterJoin(rdd2)
newRDD.collect().foreach(println)
sc.stop()
}
-----------------------------------------------------------------------
(1,(a,Some(3)))
(1,(a,Some(2)))
(1,(b,Some(3)))
(1,(b,Some(2)))
(2,(c,Some(5)))
(3,(d,Some(3)))
(4,(e,None))
11.cogroup()
类似于全连接,但是在同一个RDD中对key聚合
操作两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。
举例
- RDD1(List(1,“a”), (1,“b”))
- RDD2 (List(1, “c”),(1,“d”))
- cogruop之后,结果为(1,[“a”,“b”],[“c”,“d”])
代码示例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "c"), (1, "b"), (3, "d"), (4, "e")))
val rdd2: RDD[(Int, Int)] = sc.makeRDD(List((1, 3), (2, 5), (1, 2), (3, 3)))
// cogroup
val newRDD: RDD[(Int, (Iterable[String], Iterable[Int]))] = rdd.cogroup(rdd2)
newRDD.collect().foreach(println)
sc.stop()
}
---------------------------------------------------------------
输出结果
(1,(CompactBuffer(a, b),CompactBuffer(3, 2)))
(2,(CompactBuffer(c),CompactBuffer(5)))
(3,(CompactBuffer(d),CompactBuffer(3)))
(4,(CompactBuffer(e),CompactBuffer()))
2.5Action行动算子
1.reduce()
def reduce(f: (T, T) => T): T
聚合
f函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据
代码示例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
// reduce
val res: Int = rdd.reduce(_ + _)
println(res)
sc.stop()
}
-----------------------------------------------------
输出结果
10
2.collect()
def collect(): Array[T]
以数组Array的形式返回数据集的所有元素
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
rdd.foreach(println)
println("-------------------------")
val ints: Array[Int] = rdd.collect()
ints.foreach(println)
sc.stop()
}
----------------------------------------------------
输出结果
1
3
2
4
-------------------------
1
2
3
4
注意:虽然用rdd直接调用foreach进行输出和先调用collect再进行输出,结果都可以出来,但是调用collect之后,会把数据先收集到driver端,数据是全部存到一个数组当中;而不调用直接输出时,不同分区的数据是在不同的Executor并行输出,我们会看到没调用collect进行输出的话,元素的顺序可能会跟原顺序不一样
3.count()
计数,返回RDD中元素的个数
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
代码示例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val res: Long = rdd.count()
println(res)
sc.stop()
}
---------------
输出结果
4
4.first()
返回RDD中的第一个元素
def first(): T
代码示例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
// first 返回RDD中的第一个元素
val res: Int = rdd.first()
println(res)
sc.stop()
}
-------------------
输出结果
1
5.take()
take算子可以返回由RDD前n个元素组成的数组
def take(num: Int): Array[T]
代码示例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
// take 返回rdd前n个元素组成的数组
val res: Array[Int] = rdd.take(3)
println(res.mkString(","))
sc.stop()
}
-----------------------------------------------
输出结果
1,2,3
6.takeOrdered()
返回RDD排序后前n个元素组成的数组
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
代码示例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(3, 1, 2, 5, 4), 2)
// takeOrdered(n) 获取rdd排序后前n个元素
val ans: Array[Int] = rdd.takeOrdered(3)
println(ans.mkString(","))
sc.stop()
}
----------------------------------------------
输出结果
1,2,3
7.aggregate()
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
- 参数解释
- 第一个参数是设置的初始值,这个初始值在分区内和分区间都会用到
- 第二个参数是分区内的操作逻辑
- 第三个参数是分区间的操作逻辑
在进行分区内的逻辑时,每个分区会通过分区内逻辑和初始值进行聚合(例如:初始值为1,分区计算逻辑为相加,那么分区内的元素聚合时,最后的结果还要加一次初始值1),然后分区间也会通过分区间逻辑和初始值进行操作。
代码示例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8)
rdd.mapPartitionsWithIndex(
(index, datas) => {
println("分区号:" + index + "-> 分区数据:" + datas.mkString(","))
datas
}
).collect()
// agreegate
val res: Int = rdd.aggregate(1)(_ + _, _ + _)
println(res)
sc.stop()
}
---------------------------------------------------
输出结果
分区号:4-> 分区数据:
分区号:2-> 分区数据:
分区号:1-> 分区数据:1
分区号:5-> 分区数据:3
分区号:7-> 分区数据:4
分区号:0-> 分区数据:
分区号:6-> 分区数据:
分区号:3-> 分区数据:2
19
这里有点不太好理解,我解释一下代码,代码首先是一个集合,数据在八个分区中,由输出结果可看出,1,3,5,7号分区中有数据,分区里的逻辑是相加,相加时每个分区进行分区间的逻辑操作(相加)时,分区里元素相加的结果还要加上初始值1
然后分区间的逻辑也是相加,分区间相加时也会加上初始值
总共8个分区,每个分区都要加一次初始值1,总共要加8,所有分区里的原始数据相加总共是10,那么现在总共是18。分区间在进行相加操作时,还要加一次初始值,即为结果19。
读者可以通过修改rdd分区数以及初始值来体会这个算子。
8.fold()
def fold(zeroValue: T)(op: (T, T) => T): T
fold 是aggregate的简化,分区内和分区间计算规则相同
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 3)
rdd.mapPartitionsWithIndex(
(index, datas) => {
println("分区号:" + index + "-> 分区数据:" + datas.mkString(","))
datas
}
).collect()
val res: Int = rdd.fold(10)(_ + _)
println(res)
sc.stop()
}
---------------------------------------------------------------------------
输出结果
分区号:1-> 分区数据:2
分区号:2-> 分区数据:3,4
分区号:0-> 分区数据:1
50
初始值为10,3个分区,所以要另外加上30,分区间操作还要加上初始值,那么总共要另外加40,40再加上原始的总值10,结果为50
9.countByKey()
统计每种key的个数
def countByKey(): Map[K, Long]
- 参数解释
- 第一个参数代表出现的key
- 第二个参数为此key出现的次数
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")))
val res: collection.Map[Int, Long] = rdd.countByKey()
println(res)
sc.stop()
}
----------------------------------------------------------------------------------------
输出结果
Map(1 -> 2, 2 -> 1, 3 -> 2)
10.save算子
saveAsTextFile(path)
- 保存成Text文件
- 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统
- 对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
**saveAsSequenceFile(path) **
- 保存成Sequencefile文件
- 将数据集中的元素以Hadoop Sequencefile的格式保存到指定的目录下
- 可以使HDFS或者其他Hadoop支持的文件系统。
saveAsObjectFile(path)
- 序列化成对象保存到文件
- 将RDD中的元素序列化成对象,存储到文件中。
// save 相关算子
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 1)
// 保存为文本文件,指定保存路径
rdd.saveAsTextFile("D:\\IDEA_code\\bigData\\Spark_code\\spark\\output")
// 保存为序列化文件
rdd.saveAsObjectFile("D:\\IDEA_code\\bigData\\Spark_code\\spark\\output1")
// 保存为SequenceFile (只支持KV类型的RDD)
rdd.map((_, 1)).saveAsSequenceFile("D:\\IDEA_code\\bigData\\Spark_code\\spark\\output2")
11.foreach()
遍历RDD中每一个元素,前面的代码大多数地方都用过这个算子了,这里就不再赘述。