上万字的spark算子总结,附带每个算子的代码示例

   日期:2020-09-28     浏览:97    评论:0    
核心提示:文章目录简介Transformation转换算子value类型1.map()2.mapPartitions()3.mapPartitionsWithIndex()4.flatMap()5.glom()6.groupBy()7.filter()8.sample()9.distinct()10.coalesce()11.reparation()12.sortBy()13.pipe()双value类型1.union()并集2.intersection()交集3.subtract ()差集4. zip(

文章目录

  • 简介
  • Transformation转换算子
    • value类型
      • 1.map()
      • 2.mapPartitions()
      • 3.mapPartitionsWithIndex()
      • 4.flatMap()
      • 5.glom()
      • 6.groupBy()
      • 7.filter()
      • 8.sample()
      • 9.distinct()
      • 10.coalesce()
      • 11.reparation()
      • 12.sortBy()
      • 13.pipe()
    • 双value类型
      • 1.union()并集
      • 2.intersection()交集
      • 3.subtract ()差集
      • 4. zip()拉链
    • Key-Value类型
      • 1.partitionBy()
      • 2.reduceByKey()
      • 3.groupByKey()
      • 4.aggregateByKey()
      • 5. foldByKey()
      • 6.combineByKey()
      • 7.几种ByKey的对比
      • 8.sortByKey()
      • 9.mapValues()
      • 10. join()
      • 11.cogroup()
  • 2.5Action行动算子
    • 1.reduce()
    • 2.collect()
    • 3.count()
    • 4.first()
    • 5.take()
    • 6.takeOrdered()
    • 7.aggregate()
    • 8.fold()
    • 9.countByKey()
    • 10.save算子
    • 11.foreach()


简介

RDD(Resilient Distributed Dataset)弹性分布式数据集

是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。

  • 弹性
    • 存储(内存和磁盘可以自动切换)
    • 计算(RDD有计算出错重试机制)
    • 容错(数据丢失可以自动恢复)
    • 分区
  • 分布式
    • 不同分区中的数据会分配给集群中的不同服务器节点进行计算
  • 数据集
    • 集合不一样,RDD只封装计算逻辑,不会保存数据
  • 数据抽象
    • RDD是抽象类,需要子类实现
  • 不可变
    • RDD封装计算逻辑,是不可变的,若要改变,需要产生新的RDD,在新的RDD里面封装新的计算逻辑

1.RDD的特性

  • 1)RDD有一组分区,通过一个分区计算函数getPartitions来获取分区
  • 2)分区计算函数 compute,可以把分区数据给取出来
  • 3)RDD之间的依赖,通过getDependence
  • 4)有分区器Partitioner(对于kv类型的RDD),可以控制分区的数据流向
  • 5)数据存储优先位置 getPreferedLocation

2.RDD创建方式

  • 1)通过内存集合创建
  • 2)通过读取外部文件创建
  • 3)通过RDD的转换算子得到新的RDD

3.创建RDD分区方式

  • 通过内存集合创建

    • 默认的分区方式(取决于分配给当前应用的CPU核数)

    • 指定分区

      	// i是集合下标,length是集合长度,numSlices是分区数
      	def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = { 
            (0 until numSlices).iterator.map {  i =>
              val start = ((i * length) / numSlices).toInt
              val end = (((i + 1) * length) / numSlices).toInt
              (start, end)
            }
          }
      

  • 读取外部文件创建

    • 默认的分区方式(取决于分配给当前应用的CPU核数与2取最小值)
    • 指定分区
      • FileInputFormat里面有个getSplits(切片)方法
      • LineRecordReader里面有个next(按行读取)方法

4.RDD常用算子

  • 转换算子Transformation
    • 转换算子执行完毕后,会创建新的RDD,并不会马上执行计算
    • 1.map
      • 对RDD中的元素进行一个个映射
    • 2.mapPartitions
      • 以分区为单位,对RDD中的元素映射
    • 3.mapPartitionsWithIndex
      • 以分区为单位,对RDD中的元素进行映射,并且带分区编号
    • 4.flatMap
      • 对RDD中的元素进行扁平化处理(将整体拆分成个体)
    • 5.glom
      • 将RDD中每一个分区中的单个元素,转换为数组
    • 6.groupBy
      • 按照一定的分组规则,对RDD中的元素进行分组
    • 7.filter
      • 按照一定的规则,对RDD中的元素进行过滤
    • 8.sample
      • 参数1:是否抽样放回,如果是true,表示放回,否则不放回
      • 参数2:若参数1为true,那么参数2表示期望元素出现的次数(这里只是期望,并不代表实际次数),这个数大于等于零;参数1若为false,那么参数2表示每一个元素出现的概率,范围是[0, 1]
      • 参数3:随机算法的初始值(一般不用设置,若指定初始值,那么随机生成的数相等)
      • takeSample(行动算子):taskSample可以指定随机抽取的个数
    • 9.distinct
      • 去重,底层是通过map + reduceByKey完成的去重操作
    • 改变分区的两个算子
      • 10.coalesce
        • 一般用于缩减分区,默认情况下是不执行shuffle机制
      • 11.repartition
        • 一般用于扩大分区,底层调用的是coalesce,会将coalesce第二个参数设置为true,会执行shuffle
    • 12.sortBy
      • 按照指定规则,对RDD中的元素进行升序,默认升序(可通过参数来设置)
    • 13.pipe
      • 对于RDD中的每一个分区,都会执行pipe算子中指定的脚本
    • 14.Union
      • 合集
    • 15.intersection
      • 交集
    • 16subtract
      • 差集
    • 17.zip
      • 拉链,必须要保证两个RDD的分区数以及每个分区中元素的个数一致
    • 18.partitionBy
      • 按照指定的分区器,通过key对RDD中的元素进行分区
      • 默认分区器 HashPartitioner
    • 19.reduceByKey
      • 将相同的key放在一起,对Value进行聚合操作
    • 20.groupByKey
      • 按照key对RDD中的元素进行分组
    • 21.aggregateByKey(参数1:zeroValue)(参数2:分区内计算规则, 参数3:分区间计算规则)
    • 22.foldByKey(参数1:zereValue)(参数2:分区内和分区间计算规则)
      • 是aggregateByKey的简化,分区内和分区间计算规则相同
    • 23.combineByKey(参数1:对当前key的value进行转换, 参数2:分区内计算规则, 参数3:分区间计算规则)
    • 24.sortByKey
      • 按照RDD中的key对元素进行排序
    • 25.mapValues
      • 只对RDD中的Value进行操作
    • join&cogroup
  • 行动算子Action
    • 行动算子执行后,才会触发计算
    • 1.reduce
      • 对RDD中的元素进行聚合
    • 2.collect.foreach和foreach
      • 对RDD中的元素进行遍历
      • collect.foreach将每一个Excutor中的数据收集到Driver,形成一个新的数组
      • .foreach不是一个算子,是集合的方法,是对数组中的元素进行遍历
    • 3.count
      • 获取RDD中元素的个数
    • 4.countByKey
      • 获取RDD中每个key对应的元素个数
    • 5.first
      • 获取RDD中第一个元素
    • 6.take
      • 获取RDD中的前几个元素
    • 7.takeOrdered
      • 获取排序后的RDD中的前几个元素
    • 8.aggregate&fold
      • aggregateByKey 处理kv类型的RDD,并且在进行分区间聚合的时候,初始值不参与运算
      • aggregate处理kv类型的RDD,但是在分区间和分区内聚合时初始值都会参与运算
      • fold 是aggregate的简化版
    • 9.save相关的算子
      • saveAsTextFile
      • saveAsObjectFile
      • saveAsSequenceFile(只针对KV类型RDD)

Transformation转换算子

​ 当我们去调用的时候,会创建一个新的RDD,并不会真正执行计算操作,只是做一些计算逻辑的封装

  • RDD整体上分为:
    • Value类型
    • 双Value类型
    • Key-Value类型

Action行动算子

​ 只有行动算子被调用之后才会去做计算

​ 行动算子是触发了整个作业的执行。因为转换算子都是懒加载,并不会立即执行。

Transformation转换算子

value类型

1.map()

  • map(fun):返回一个新的RDD,该RDD由每一个输入元素经过fun函数转换后组成.

  • 当某个RDD执行map方法时,会遍历RDD中的每一个数据项,并依次应用fun函数,产生一个新的RDD

  • 新的RDD分区数和旧的RDD分区数相同

  • 代码示例

    	def main(args: Array[String]): Unit = { 
    
            // 创建SparkConf并设置App名称
            val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    
            // 创建SparkContext,该对象是提交Spark App的入口
            val sc: SparkContext = new SparkContext(conf)
    
            val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
            println("原分区个数" + rdd.partitions.size)
    
            val newRDD: RDD[Int] = rdd.map(_ * 2)
            println("新分区个数" + newRDD.partitions.size)
    
            newRDD.collect().foreach(println)
    
            // 关闭连接
            sc.stop()
        }
    -----------------------------------------------------------
    输出结果:
    原分区个数2
    新分区个数2
    2
    4
    6
    8
    

2.mapPartitions()

  • 以分区为单位执行Map算子

  • 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。

  • 假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。

  • 参数是一个可迭代的集合

  • map是一次处理一个元素,mapPartitions是一次处理一个分区的元素,mapPartitions适用于批处理操作

  • 代码示例(对RDD中的元素进行乘2操作)

    	def main(args: Array[String]): Unit = { 
    
            // 创建SparkConf并设置App名称
            val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    
            // 创建SparkContext,该对象是提交Spark App的入口
            val sc: SparkContext = new SparkContext(conf)
    
            val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
    
            // 以分区为单位,对RDD中的元素进行映射
            val newRDD: RDD[Int] = rdd.mapPartitions(datas => { 
                datas.map(_ * 2)
            })
            
            newRDD.collect().foreach(println)
    
            // 关闭连接
            sc.stop()
        }
    ---------------------------------------
    输出结果
    2
    4
    6
    8
    

    适用于批处理操作
    比如将RDD中的元素插入到数据库中,需要数据库连接,如果每一个元素都创建一个连接,效率很低,可以对每个分区的元素创建一个连接

    mapPartitions每次处理一批数据,这个分区中数据处理完之后,原来的RDD中分区的数据才会释放,可能会导致内存溢出

3.mapPartitionsWithIndex()

两个参数,一个是当前分区的编号,另外一个是分区的数据(可以单独对某一个分区进行操作)

  • 代码示例

    • 需求1

    • 对RDD元素做映射,将元素变为元组,元组里面分别表示分区编号和当前元素

      	def main(args: Array[String]): Unit = { 
      
              // 创建SparkConf并设置App名称
              val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
      
              // 创建SparkContext,该对象是提交Spark App的入口
              val sc: SparkContext = new SparkContext(conf)
      
              val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
      
              val newRDD: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex(
                  (index, datas) => { 
                      datas.map((index, _))
                  }
              )
              newRDD.collect().foreach(println)
              // 关闭连接
              sc.stop()
          }
      -----------------------------------------------------------------
      输出结果
      (0,1)
      (0,2)
      (1,3)
      (1,4)
      
    • 需求2

    • 第二个分区乘2,其他分区不变

      	val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8), 3)
          val newRDD: RDD[Int] = rdd.mapPartitionsWithIndex(
              (index, datas) => { 
                  index match { 
                      case 1 => datas.map(_ * 2)
                      case _ => datas
                  }
              }
          )
      	newRDD.collect().foreach(println)
      
      --------------------------------------
      输出结果
      1
      2
      6
      8
      10
      6
      7
      8
      

4.flatMap()

功能与map相似,将RDD中的每一个元素通过应用f函数依次转换为新函数,并封装到RDD中

跟map的区别在于:

  • flatMap中f函数返回值是一个集合,将RDD集合中每一个元素拆分出来放到新的RDD当中

代码示例

  • 需求

    • 对集合进行扁平化处理,将一个大的集合(里面有子集合)里面数据取出,放入一个大的集合当中

      	def main(args: Array[String]): Unit = { 
      
              // 创建SparkConf并设置App名称
              val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
      
              // 创建SparkContext,该对象是提交Spark App的入口
              val sc: SparkContext = new SparkContext(conf)
      
              val rdd: RDD[List[Int]] = sc.makeRDD(List(List(1, 2), List(3, 4), List(5, 6), List(7, 8)), 2)
      
              // 如果匿名函数输入和输出相同,那么不能进行简化
              val newRDD: RDD[Int] = rdd.flatMap(datas => datas)
              newRDD.collect().foreach(println)
              // 关闭连接
              sc.stop()
          }
      
      ----------------------------------------------
      1
      2
      3
      4
      5
      6
      7
      8
      

5.glom()

def glom(): RDD[Array[T]]

将RDD中每一个分区中元素组合成一个数组,封装到新的RDD当中,数组中元素的类型保持不变

(flatMap是将整体转换成个体,glom是将个体转换成整体)

代码示例

  • 需求

    • 将RDD中每个分区里的数据放到数组当中

      def main(args: Array[String]): Unit = { 
              val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
              
              val sc: SparkContext = new SparkContext(conf)
      
              var rdd: RDD[Int] =sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
      
              println("----------没有进行glom之前--------------")
              rdd.mapPartitionsWithIndex(
                (index, datas) => { 
                    println("当前分区:" + index + " " + "元素:"+ datas.mkString(","))
                    datas
                }
              ).collect()
      
              println("----------调用glom之后--------------")
              val newRDD: RDD[Array[Int]] = rdd.glom()
              newRDD.mapPartitionsWithIndex(
                  (index, datas) => { 
                      println("当前分区:" + index + " " + "元素:"+ datas.next().mkString(","))
                      datas
                  }
              ).collect()
              
              sc.stop()
          }
      -------------------------------------------
      
      输出结果
      ----------没有进行glom之前--------------
      当前分区:1 元素:4,5,6
      当前分区:0 元素:1,2,3
      ----------调用glom之后--------------
      当前分区:0 元素:1,2,3
      当前分区:1 元素:4,5,6
      

6.groupBy()

def groupBy[K](f T => K) (implicit kt: ClassTag[K]): RDD[(k, Iterable[T])]

分组,按照传入函数的返回值进行分组,相同key对应的值放入一个迭代器(按照指定的规则,对RDD中的元素进行分组)

groupBy会存在shuffle的过程进行落盘

代码示例

  • 需求

    • 将RDD中奇数放在一组,偶数放到另一组

      	def main(args: Array[String]): Unit = { 
              val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
      
              val sc: SparkContext = new SparkContext(conf)
      
              val rdd: RDD[Int] =sc.makeRDD(1 to 9, 3)
              println("-------------执行分组前----------------")
              rdd.mapPartitionsWithIndex(
                  (index, datas) => { 
                      println(s"分区号:$index " + "元素:" + datas.mkString(","))
                      datas
                  }
              ).collect()
      
              val newRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)
              println("-------------执行分组后----------------")
              newRDD.mapPartitionsWithIndex(
                  (index, datas) => { 
                      println(s"分区号:$index " + "元素:" + datas.mkString(","))
                      datas
                  }
              ).collect()
      
              sc.stop()
          }
      
      --------------------------------------------
      输出结果
      
      -------------执行分组前----------------
      分区号:0 元素:1,2,3
      分区号:2 元素:7,8,9
      分区号:1 元素:4,5,6
      -------------执行分组后----------------
      分区号:2 元素:
      分区号:1 元素:(1,CompactBuffer(1, 3, 5, 7, 9))
      分区号:0 元素:(0,CompactBuffer(2, 4, 6, 8))
      
    • 给出一个RDD,将相同的单词放到一组

      val rdd: RDD[String] = sc.makeRDD(List("Peanut", "Hadoop", "Peanut", "Hadoop", "Hive"))
      val newRDD: RDD[(String, Iterable[String])] = rdd.groupBy(elem => elem)
      newRDD.collect().foreach(println)
      
      -------------------------------------------
      输出结果
      (Hive,CompactBuffer(Hive))
      (Peanut,CompactBuffer(Peanut, Peanut))
      (Hadoop,CompactBuffer(Hadoop, Hadoop))
      

用以上的算子来实现一个简单版的WordCount

List("Peanut", "Hadoop", "Hadoop", "Spark", "Spark", "Spark")

首先从集合中读取数据,用flatMap对RDD中的数据进行扁平化处理,通过map来对每一个单词设置为元组,每个单词对应的数为1,使用groupBy将相同的key放到一组,对相同的单词进行累加

  • 实现方式1

    	def main(args: Array[String]): Unit = { 
            val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
            val sc: SparkContext = new SparkContext(conf)
    
            val rdd: RDD[String] = sc.makeRDD(List("Peanut", "Hadoop", "Hadoop", "Spark", "Spark", "Spark"))
    
            // 对RDD中的元素进行扁平化映射
            val flatMap: RDD[String] = rdd.flatMap(_.split(" "))
    
            // 将映射后的数据进行结构转换,为每个单词进行计数
            val mapRDD: RDD[(String, Int)] = flatMap.map((_, 1))
    
            // 按照key对RDD中的元素进行分组
            val groupByRDD: RDD[(String, Iterable[(String, Int)])] = mapRDD.groupBy(_._1)
    
            // 对分组后的元素再进行映射
            val resRDD: RDD[(String, Int)] = groupByRDD.map({ 
                case (word, datas) => { 
                    (word, datas.size)
                }
            })
    
            resRDD.collect().foreach(println)
            sc.stop()
        }
    
    ------------------------------------------------
    输出结果
    (Peanut,1)
    (Spark,3)
    (Hadoop,2)
    
  • 实现方式2

    • 其实在方式1当中,进行扁平化处理之后,不需要再将结果映射成map,可以直接对结果按照key进行groupBy,最后用map统计相同key的长度即为结果
    def main(args: Array[String]): Unit = { 
            val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
            val sc: SparkContext = new SparkContext(conf)
    
            val rdd: RDD[String] = sc.makeRDD(List("Peanut", "Hadoop", "Hadoop", "Spark", "Spark", "Spark"))
            
        	// 对RDD中的元素进行扁平化映射
            val flatMap: RDD[String] = rdd.flatMap(_.split(" "))
            // 将RDD中的单词进行分组
            val groupByRDD: RDD[(String, Iterable[String])] = flatMap.groupBy(word => word)
            // 对分组后的元素再进行映射
            val resRDD: RDD[(String, Int)] = groupByRDD.map({ 
                case (word, datas) => { 
                    (word, datas.size)
                }
            })
            resRDD.collect().foreach(println)
            sc.stop()
        }
    ------------------------------------------------
    输出结果
    (Peanut,1)
    (Spark,3)
    (Hadoop,2)
    

用以上的算子来实现一个复杂版的WordCount

数字代表出现的次数

List(("Peanut Spark", 2), ("Hello Spark", 3), ("Hello World", 2)
  • 方式1
    • 先用map对其进行字符串拼接
    • 再进行扁平化处理,对结果按照key进行groupBy,最后用map统计相同key的长度即为结果
	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
       
    	val rdd: RDD[(String, Int)] = sc.makeRDD(List(("Peanut Spark", 2), ("Hello Spark", 3), ("Hello World", 2)))
    
        // 将原来RDD中字符串以及字符串出现的次数进行处理,形成一个新的字符串
        val rdd1: RDD[String] = rdd.map{ 
            case (str, count) => { 
                (str + " ") * count
            }
        }
        println("输出当前rdd1")
        rdd1.collect().foreach(println)
        println("-------------------------------------")
        // 对RDD中的元素进行扁平化映射
        val flatMap: RDD[String] = rdd1.flatMap(_.split(" "))
        // 将RDD中的单词进行分组
        val groupByRDD: RDD[(String, Iterable[String])] = flatMap.groupBy(word => word)
        // 对分组后的元素再进行映射
        val resRDD: RDD[(String, Int)] = groupByRDD.map({ 
            case (word, datas) => { 
                (word, datas.size)
            }
        })
        resRDD.collect().foreach(println)
        sc.stop()
    }
-------------------------------------------------------------------
输出结果
输出当前rdd1
Peanut Spark Peanut Spark 
Hello Spark Hello Spark Hello Spark 
Hello World Hello World 
-------------------------------------
(Peanut,2)
(Hello,5)
(World,2)
(Spark,5)
  • 方式2
    • 将元素转换成元组,然后进行分组统计
    • (“Peanut Spark”, 2) ----> (“Peanut”, 2) (“Spark”, 2)
    def main(args: Array[String]): Unit = { 
        val rdd: RDD[(String, Int)] = sc.makeRDD(List(("Peanut Spark", 2), ("Hello Spark", 3), ("Hello World", 2)))
        val flatMapRDD: RDD[(String, Int)] = rdd.flatMap { 
                case (words, count) => { 
                    // 对多个单词组成的字符串进行切割
                    words.split(" ").map((_, count))
                }
            }

            // 按照单词对RDD中的元素进行分组
            val groupByRDD: RDD[(String, Iterable[(String, Int)])] = flatMapRDD.groupBy(_._1)

            val resRDD: RDD[(String, Int)] = groupByRDD.map{ 
                case (word, datas) =>
                    (word, datas.map(_._2).sum)
            }

            resRDD.collect().foreach(println)
    }
--------------------------------------------------------------------------------
输出结果
(Peanut,2)
(Hello,5)
(World,2)
(Spark,5)

7.filter()

def filter(f T=> Boolean): RDD[T]

RDD调用filter时,对RDD中的每一个元素应用f函数,将返回值为true的元素添加到新的RDD当中

按照指定的过滤规则,对RDD中的元素进行过滤

代码示例

  • 需求1

    List("Peanut", "zhangsan", "lisi", "wangwu", "xiaoliu", "xiaowang")
    
    • 将集合中包含"xiao"的字符串提取出来
    	def main(args: Array[String]): Unit = { 
            val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    
            val sc: SparkContext = new SparkContext(conf)
    
            val rdd: RDD[String] = sc.makeRDD(List("Peanut", "zhangsan", "lisi", "wangwu", "xiaoliu", "xiaowang"))
    
            val newRDD: RDD[String] = rdd.filter(_.contains("xiao"))
            newRDD.collect().foreach(println)
    
            sc.stop()
        }
    
    -------------------------------------------
    输出结果
    xiaoliu
    xiaowang
    
  • 需求2

    List(1, 2, 3, 4, 5, 6, 7, 8, 9)
    
    • 将所有的偶数过滤出来
    	def main(args: Array[String]): Unit = { 
            val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    
            val sc: SparkContext = new SparkContext(conf)
    
            val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9))
            // 把所有的奇数过滤出来
            val newRDD: RDD[Int] = rdd.filter(_ % 2 == 0)
            newRDD.collect().foreach(println)
            sc.stop()
        }
    
    -------------------------------------------------------
    输出结果
    2
    4
    6
    8
    

8.sample()

随机抽样

def sample(
	withReplacement: Boolean, // true为有放回的抽样,false为无放回抽样
    fraction: Double,   // 当withReplacement = false时,表示每个元素的概率,范围是[0,1]
    					// 当withReplacement = true时,表示每个元素的期望次数,范围是 >= 0
    seed: Long = Untils.random.nextLong  // send表示随机数生成器种子(生成随机数的初始值,种子一样时,生成的随机数相同)抽样算法的初始值,一般不需要指定
): RDD[T]

代码示例

  • 需求1
    • 抽样放回
	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[Int] = sc.makeRDD(1 to 10)

        val newRDD: RDD[Int] = rdd.sample(true, 1)
        newRDD.collect().foreach(println)
        sc.stop()
    }
-----------------------------------------------------
输出结果(每次执行的结果都不同)
1
2
2
2
2
5
5
6
6
6
7
10
10
10

最后的结果是随机的,第二个参数设置期望出现次数为1,只是期望,并不代表每个数字都出现一次。

  • 需求2
    • 抽样不放回
    • 那么此时第二个参数代表的就是每个数出现的概率,如果设为1,则每个数都出现一次
    • 若为0,则都不出现
	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[Int] = sc.makeRDD(1 to 10)

        // 从RDD中随机抽取数据(抽样不放回)
        val newRDD: RDD[Int] = rdd.sample(false, 0.3)
        newRDD.collect().foreach(println)
        sc.stop()
    }

---------------------------------------------
3
5
7

takeSample可以在抽取时指定抽取的个数(在第二个参数中指定抽取的个数)

  • 小案例
    • 从名单中抽取三名幸运观众
	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        val sc: SparkContext = new SparkContext(conf)

        val stds = List("张1", "张2", "张3", "张4", "张5", "张6", "张7", "张8", "张9",
            "张10", "张11", "张12", "张13", "张14", "张15", "张16",
            "张17", "张18", "张19", "张21", "张20", "张22")
    
        val nameRDD: RDD[String] = sc.makeRDD(stds)
    
        // 从上面RDD当中抽取一名幸运观众
        val res: Array[String] = nameRDD.takeSample(false, 3)
        res.foreach(println)
        sc.stop()
    }
------------------------------------------
输出结果
张18131

9.distinct()

去重算子

def distinct(): RDD[T]

distinct算子是对内部元素进行去重,将去重后的结果放到新的RDD当中

默认情况下,distinct会生成与原分区个数一致的分区数

distinct有两种,一种是不加参数,一种是可以指定分区数

代码示例

  • 需求1
    • 对集合中的数去重,不指定分区数
	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        // 创建RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 5, 4, 4, 3, 3), 5)
        // 对RDD中数据进行去重
        val newRDD: RDD[Int] = rdd.distinct()
        newRDD.collect().foreach(println)
        sc.stop()
    }

-------------------------------------
输出结果
5
1
2
3
4
  • 需求2
    • 对集合中的数据去重,指定分区数
	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        // 创建RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 5, 4, 4, 3, 3), 5)

        rdd.mapPartitionsWithIndex{ 
            (index, datas) =>
                println("分区:"+ index + "---> 分区元素:" + datas.mkString(","))
                datas
        }.collect()

        println("------------去重前后分区对比----------------")
        
    	// 对RDD中数据进行去重
        val newRDD: RDD[Int] = rdd.distinct(2)

        newRDD.mapPartitionsWithIndex{ 
            (index, datas) =>
                println("分区:"+ index + "---> 分区元素:" + datas.mkString(","))
                datas
        }.collect()
        sc.stop()
    }

--------------------------------------------------------
输出结果
分区:4---> 分区元素:3,3
分区:1---> 分区元素:3,4
分区:0---> 分区元素:1,2
分区:2---> 分区元素:5,5
分区:3---> 分区元素:4,4
------------去重前后分区对比----------------
分区:1---> 分区元素:1,3,5
分区:0---> 分区元素:4,2

10.coalesce()

coalesce算子可以重新分区

def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null)
 : RDD[T]

代码示例

  • 需求1
    • 缩减分区
    • 默认情况下,如果使用coalesce扩大分区是不起作用的,因为底层没有执行shuffle,如果扩大分区,使用reparation
	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        // 创建RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)

        rdd.mapPartitionsWithIndex{ 
            (index, datas) =>
                println("分区:"+ index + "---> 分区元素:" + datas.mkString(","))
                datas
        }.collect()

        println("*****************缩减分区前后********************")
        // 缩减分区
        val newRDD: RDD[Int] = rdd.coalesce(2)
        newRDD.mapPartitionsWithIndex{ 
            (index, datas) =>
                println("分区:"+ index + "---> 分区元素:" + datas.mkString(","))
                datas
        }.collect()
        sc.stop()
    }

----------------------------------------------------------
分区:0---> 分区元素:1,2
分区:2---> 分区元素:5,6
分区:1---> 分区元素:3,4
***************缩减分区前后**********************
分区:0---> 分区元素:1,2
分区:1---> 分区元素:3,4,5,6

11.reparation()

重新分区,扩大分区时,可以使用reparation,reparation会执行shuffle

	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        // 创建RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)

        rdd.mapPartitionsWithIndex{ 
            (index, datas) =>
                println("分区:"+ index + "---> 分区元素:" + datas.mkString(","))
                datas
        }.collect()

        // 使用reparation扩大分区
        val newRDD: RDD[Int] = rdd.repartition(4)
        println("******************扩大分区前后*******************")
        newRDD.mapPartitionsWithIndex{ 
            (index, datas) =>
                println("分区:"+ index + "---> 分区元素:" + datas.mkString(","))
                datas
        }.collect()
        sc.stop()
    }

-----------------------------------
输出结果
分区:2---> 分区元素:5,6
分区:1---> 分区元素:3,4
分区:0---> 分区元素:1,2
******************扩大分区前后*******************
分区:2---> 分区元素:
分区:1---> 分区元素:
分区:3---> 分区元素:1,3,5
分区:0---> 分区元素:2,4,6

repartition的底层其实是调用的coalesce,只不过在调用时,第二个参数指定执行shuffle机制

请看源码:

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { 
    coalesce(numPartitions, shuffle = true)
}
  • coalesce
    • 第二个参数默认为false,底层默认不执行shuffle
    • 一般用于缩减分区
  • repartition
    • 底层调用coalesce,调用coalesce时第二个参数为true,执行shuffle
    • 一般用于扩大分区

12.sortBy()

排序

对RDD中的元素进行排序,排序时需要指定排序规则

def sortBy[K](
      f: (T) => K,
      ascending: Boolean = true,  // 默认为正序排序
      numPartitions: Int = this.partitions.length)
      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] 

代码示例

  • 需求1
    • 升序排序
	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        // 创建RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1, 4, 6, 5, 3, 2))

        // 升序排序
        val sortedRDD: RDD[Int] = rdd.sortBy(num => num)
        sortedRDD.collect().foreach(println)
        sc.stop()
    }
-------------------------------------------------------
输出结果
1
2
3
4
5
6
  • 需求2
    • 降序排序(sortBy的第二个参数需要指定为false)
	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        // 创建RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1, 4, 6, 5, 3, 2))

        // 降序排序
        val sortedRDD: RDD[Int] = rdd.sortBy(num => num, false)
        
        sortedRDD.collect().foreach(println)
        sc.stop()
    }
-----------------------------------------------------------------
输出结果
6
5
4
3
2
1
  • 需求3
    • 对字符串进行排序
	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        
        val strRDD: RDD[String] = sc.makeRDD(List("1", "6", "2", "10"))
        
        println("-----------对字符串按字典序排序-------------")
        // 按照字符串字典顺序进行排序
        val sortedRDD: RDD[String] = strRDD.sortBy(elem => elem)
        sortedRDD.collect().foreach(println)
        
        println("------------将字符串按照字面的数字来进行排序---------------")
        // 按照字符串转换为整数后的大小进行排序
        val sortedRDD2: RDD[String] = strRDD.sortBy(_.toInt)
        sortedRDD2.collect().foreach(println)
        
        sc.stop()
    }

-----------------------------------------
输出结果
-----------对字符串按字典序排序-------------
1
10
2
6
------------将字符串按照字面的数字来进行排序---------------
1
2
6
10

13.pipe()

def pipe(command: String): RDD[String]

针对每一个分区,都调用一次shell脚本,返回输出的RDD

双value类型

1.union()并集

def union(other: RDD[T]): RDD[T] 

代码示例

	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
        val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6, 7))
        // 求并集
        val newRDD: RDD[Int] = rdd1.union(rdd2)
        newRDD.collect().foreach(println)
        sc.stop()
    }
-------------------------------------
1
2
3
4
3
4
5
6
7

2.intersection()交集

def intersection(other: RDD[T]): RDD[T]

代码示例

	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
        val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6, 7))
     	// 求交集
        val newRDD: RDD[Int] = rdd1.intersection(rdd2)
        newRDD.collect().foreach(println)
        sc.stop()
    }
---------------------
输出结果
3
4

3.subtract ()差集

def subtract(other: RDD[T]): RDD[T]

代码示例

	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
        val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6, 7))
		// 求差集
        val newRDD: RDD[Int] = rdd1.subtract(rdd2)
        newRDD.collect().foreach(println)
        sc.stop()
    }
--------------------------------------------
输出结果
1
2

4. zip()拉链

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

拉链的功能是将两个RDD中的元素合并成元组,元组中key为第一个RDD中的元素,value为第二个RDD中的元素

要求两个RDD的分区数量以及元素数量都相等,否则会抛异常

代码示例

	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
        val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6))
      
        // 拉链
        val newRDD: RDD[(Int, Int)] = rdd1.zip(rdd2)

        newRDD.collect().foreach(println)
        sc.stop()
    }
----------------------------------------
输出结果
(1,3)
(2,4)
(3,5)
(4,6)

Key-Value类型

1.partitionBy()

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

按照key从新分区,将RDD[K, V]中的K,按照指定Partitioner重新进行分区如果原有的partitionRDD和现有partitionRDD是一致的话,就不进行分区,否则会产生shuffe过程。

代码示例

	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        // 创建RDD
        val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "b"), (3, "c")), 3)
        rdd.mapPartitionsWithIndex(
          (index, datas) =>{ 
            println("分区号:" + index + "---->" + "分区数据" + datas.mkString(","))
            datas
          }
        ).collect()

        println("**************分区前后对比*****************")
        val newRDD: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))
        newRDD.mapPartitionsWithIndex(
            (index, datas) =>{ 
                println("分区号:" + index + "---->" + "分区数据" + datas.mkString(","))
                datas
            }
        ).collect()

        sc.stop()
    }
----------------------------------------
输出结果
分区号:0---->分区数据(1,a)
分区号:2---->分区数据(3,c)
分区号:1---->分区数据(2,b)
**************分区前后对比*****************
分区号:1---->分区数据(1,a),(3,c)
分区号:0---->分区数据(2,b)

自定义分区器

class MyPartitioner(partitions: Int) extends Partitioner { 
    // 获取分区个数
    override def numPartitions: Int = partitions

    // 指定分区规则 返回值Int表示分区编号,从0开始
    override def getPartition(key: Any): Int = { 
        // 此时返回为1,那么所有的元素都会被分到1号区
        1
    }
}
------------------------------------------------------------------------------val newRDD: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))new HashPartitioner(2)换成new MyPartitioner(2)
输出结果
分区号:1---->分区数据(2,b)
分区号:0---->分区数据(1,a)
分区号:2---->分区数据(3,c)
**************分区前后对比*****************
分区号:0---->分区数据
分区号:1---->分区数据(1,a),(2,b),(3,c)

2.reduceByKey()

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

reduceByKey可以将RDD[K, V]中的元素按照相同的K对V进行聚合

可以指定新的RDD中分区器或分区数

代码示例

  • 将集合中相同key的元组进行聚合
	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 4), ("b", 3)))
        val resRDD: RDD[(String, Int)] = rdd.reduceByKey(_ + _)
        resRDD.collect().foreach(println)
        sc.stop()
    }
--------------------------------------------------------------
输出结果
(a,5)
(b,5)

3.groupByKey()

def groupByKey(): RDD[(K, Iterable[V])]

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

groupByKey对集合中的每个key进行操作,会将相同key对应的value放到一个集合当中,不进行聚合

可以指定新的RDD中分区器(默认使用的分区器是HashPartitioner)或分区数

代码示例

	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 5), ("b", 2), ("a", 4), ("b", 3)))
        val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
        groupRDD.collect().foreach(println)
        sc.stop()
    }
---------------------------------
输出结果
(a,CompactBuffer(5, 4))
(b,CompactBuffer(2, 3))
  • wordcount案例
	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 5), ("b", 2), ("a", 4), ("b", 3)))
        val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
        
        val resRDD = groupRDD.map { 
            case (key, datas) => { 
                (key, datas.sum)
            }
        }
        resRDD.collect().foreach(println)
        sc.stop()
    }

---------------------------------------------------------------
输出结果
(a,9)
(b,5)
  • reduceByKey用来做聚合
  • groupByKey用来做分组

4.aggregateByKey()

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)]

aggregateByKey是按照key对分区内以及分区间的数据进行处理

aggregateByKey(参数1)(参数2, 参数3)

  • 参数1: 初始值

  • 参数2: 分区内计算规则

  • 参数3: 分区间计算规则

代码示例

  • 需求1
    • 用aggregateByKey实现wordCount
	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        // 创建RDD
        val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("c", 5), ("b", 5), ("c", 1), ("b", 2)), 2)

        // aggregateByKey实现wordCount
        val newRDD: RDD[(String, Int)] = rdd.aggregateByKey(0)(_ + _, _ + _)
        newRDD.collect().foreach(println)

        sc.stop()
    }

----------------------------------------------
输出结果
(b,7)
(a,6)
(c,6)
  • 需求2
    • 对每个分区内的最大值求和
	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        // 创建RDD
        val rdd: RDD[(String, Int)] = sc.makeRDD(List(("c", 5), ("a", 2), ("a", 3), ("b", 5), ("a", 3), ("c", 3), ("b", 2)), 2)

        rdd.mapPartitionsWithIndex(
            (index, datas) => { 
              println("分区:" + index + "---> 分区数据:" + datas.mkString(","))
              datas
            }
        ).collect()
        println("求分区最大值")
        val newRDD: RDD[(String, Int)] = rdd.aggregateByKey(0)(
            // 分区内的计算规则,找出最大值,max在进行比较时,首先是将第一个元素跟初始值比较
            (x, y) => math.max(x, y),
            // 分区间计算规则
            (a, b) => a + b
        )
        
        newRDD.collect().foreach(println)

        sc.stop()
    }

-----------------------------------------------------
输出结果
分区:0---> 分区数据:(c,5),(a,2),(a,3)
分区:1---> 分区数据:(b,5),(a,3),(c,3),(b,2)
求分区最大值
(b,5)
(a,6)
(c,8)

当分区内计算逻辑和分区间计算逻辑不一样时,可以考虑使用aggregateByKey

5. foldByKey()

def foldByKey(
      zeroValue: V,
      partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
  • 参数含义
    • zeroValue代表初始值,可以取任意类型
    • func 代表一个函数
    • Partitioner 分区器
    • numPartitions 分区数

foldByKey是aggregateByKey的简化版本,如果分区内和分区间计算逻辑相同,那么可以使用foldByKey,foldByKey和reduceByKey之间的差别在于foldKey可以指定初始值

代码示例

  • 需求
    • 按照key对分区内和分区间的数据进行相加(用foldByKey实现wordCount)
	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        // 创建RDD
        val rdd: RDD[(String, Int)] = sc.makeRDD(List(("c", 2), ("a", 1), ("a", 2), ("b", 5), ("a", 3), ("c", 3), ("b", 2)), 2)
        val newRDD: RDD[(String, Int)] = rdd.foldByKey(0)(_ + _)
        newRDD.collect().foreach(println)
        sc.stop()
    }

------------------------------------------------------
(b,7)
(a,6)
(c,5)

6.combineByKey()

转换结构后分区内和分区间操作

def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C): RDD[(K, C)]

def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      numPartitions: Int): RDD[(K, C)]

def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null): RDD[(K, C)]

参数说明:

  • createCombiner

    • 对RDD中当前key取出第一个value做初始化,把当前的值作为参数,对该值可以进行一些转换操作,转换成我们想要的格式,比如读进来(10),可以转换为(10, 1)
  • mergeValue

    • 分区内的计算规则,会将当前分区的value值(除去第一个value剩下的value),合并到初始化得到的c上面
  • mergeCombiners

    • 分区间的计算规则

combineByKey算子的使用场景

代码示例

List(("LaoWang", 90), ("LaoLiu", 80), ("LaoWang", 85), ("LaoLiu", 60)

List中表示的是学生姓名和成绩,一个学生有不同科目的成绩

  • 需要求出每个学生的平均成绩
    • 先对读进来的数据进行溢写格式转换,比如将"LaoWang"对应的90转换为(90, 1),加上一个1是为了后面方便计算每个同学总共的科目数量
    • 然后在分区内对分数相加,每加一次,课程门数加1
    • 分区间,将分数以及课程门数分别相加
    • 得到每个同学的总成绩以及课程数量之后,通过map来求平均值
def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        // 需求,求出每一个学生的平均成绩
        // 创建RDD
        val scoreRDD: RDD[(String, Int)] = sc.makeRDD(List(("LaoWang", 90), ("LaoLiu", 80), ("LaoWang", 85), ("LaoLiu", 60)), 2)

        val combineRDD: RDD[(String, (Int, Int))] = scoreRDD.combineByKey(
            // 初始化
            (_, 1),
            // 分区内计算规则 将分数相加,每加一次,课程门数加1
            (t1: (Int, Int), v) => { 
                (t1._1 + v, t1._2 + 1)
            },
            // 分区间计算规则,将分数以及课程门数分别相加
            (tup2: (Int, Int), tup3: (Int, Int)) => { 
                (tup2._1 + tup3._1, tup2._2 + tup2._2)
            }
        )

        // 求平均成绩
        val resRDD: RDD[(String, Int)] = combineRDD.map { 
            case (name, (score, count)) => { 
                (name, score / count)
            }
        }
        resRDD.collect().foreach(println)
        sc.stop()
    }

-------------------------------------------------------------------------------------
输出结果
(LaoLiu,70)
(LaoWang,87)

7.几种ByKey的对比

  • 如果分区内和分区间计算逻辑相同,并且不需要指定初始值,那么优先使用reduceByKey
  • 如果分区内和分区间计算逻辑相同,并且需要指定初始值,那么优先使用foldByKey
  • 如果分区内和分区间计算逻辑相同,并且需要指定初始值,那么优先使用aggregateByKey
  • 需要对读入的RDD中数据进行格式转换时,并且要处理分区内和分区间的逻辑,那么优先使用combineByKey

8.sortByKey()

按照K进行排序

在一个(K, V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
      : RDD[(K, V)]

参数说明

  • 第一个参数代表排序规则,true为默认,默认为升序
  • 第二个参数为分区数,默认跟当前情况下分区数相同

代码示例

  • 按照key升序排序
	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        // 创建RDD
        val rdd: RDD[(Int, String)] = sc.makeRDD(Array((2, "aa"), (6, "dd"), (4, "bb"), (1, "cc")))
        // 按照key对RDD中的元素进行排序 Int已经默认实现了Ordered接口
        val newRDD: RDD[(Int, String)] = rdd.sortByKey()
        newRDD.collect().foreach(println)

        sc.stop()
    }
-------------------------------------------------------------
(1,cc)
(2,aa)
(4,bb)
(6,dd)

如果想降序排序,只需在方法里设置第一个参数为false即可

val newRDD: RDD[(Int, String)] = rdd.sortByKey(false)
  • 如果key为自定义类型,要求必须混入Ordered特质
    • 自定义一个Student类,要求先按照名称升序,如果名称相同的话,再按照年龄降序
	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        val stdList: List[(Student, Int)] = List(
            (new Student("zhangsan", 18), 1),
            (new Student("lisi", 12), 1),
            (new Student("wangwu", 20), 1),
            (new Student("wangwu", 23), 1)
        )
        val stdRDD: RDD[(Student, Int)] = sc.makeRDD(stdList)
        val resRDD: RDD[(Student, Int)] = stdRDD.sortByKey()
        resRDD.collect().foreach(println)
        sc.stop()
    }
-----------------------------------------------------------------------------
输出结果
(Student(lisi, 12) ,1)
(Student(wangwu, 23) ,1)
(Student(wangwu, 20) ,1)
(Student(zhangsan, 18) ,1)

Student类定义如下

class Student(var name: String, var age: Int) extends Ordered[Student] with Serializable { 
    // 指定比较规则
    override def compare(that: Student): Int = { 
        // 先按照名称升序,如果名称相同的话,再按照年龄降序
        var res: Int = this.name.compareTo(that.name)
        if (res == 0) { 
            // res 等于0 说明名字相同,此时按照年龄降序排序
            res = that.age - this.age
        }
        res
    }

    override def toString: String = s"Student($name, $age) "
}

9.mapValues()

def mapValues[U](f: V => U): RDD[(K, U)]
对kv类型的RDD中的value部分进行映射

代码示例

  • 需求
    • 对RDD中的value添加字符串" --> "
	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "c"), (1, "b"), (3, "d")))
        val newRDD: RDD[(Int, String)] = rdd.mapValues(" --> " + _)
        newRDD.collect().foreach(println)
        sc.stop()
    }

----------------------------------------------------
输出结果
(1, --> a)
(2, --> c)
(1, --> b)
(3, --> d)

10. join()

我们先来回顾一下SQL中的连接

  • 连接
    • 两张表或者多张表结合在一起获取数据的过程
  • 内连接
    • 两张表进行连接查询,将两张表中完全匹配的记录查询出来
    • 等值连接(某个属性值相等)
      • =
    • 非等值连接(确定一个范围)
      • between and …
    • 自连接(本张表和本张表连接)
  • 外连接
    • 两张表进行连接查询,将其中一张表的数据全部查询出来,如果另外一张表中有数据无法与其匹配,则会自动模拟出空值与其进行匹配
    • 左(外)连接
      • 左连接是左边表的所有数据都有显示出来,右边的表数据只显示共同有的那部分,没有对应的部分补空显示
    • 右(外)连接
      • 右连接和左连接相反的
    • 全连接
      • 返回两个表中的所有的值,没有对应的数据则输出为空

join()是将相同key对应的多个value关联在一起

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] 

def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

对于两个RDD ,类型分别为(k, v)和(k, w),调用join算子时,返回一个相同key对应的所有元素组合在一起的RDD(k, (v, w)),简单点来说,就是k不变,将value从新组合放到一起

如果key只是某一个RDD中有,那么这个key不会关联,类似于内连接,将两张表完全匹配的数据给显示出来

代码示例

  • 将两个RDD进行join
	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "c"), (1, "b"), (3, "d"), (4, "e")))
        val rdd2: RDD[(Int, Int)] = sc.makeRDD(List((1, 3), (2, 5), (1, 2), (3, 3)))
		// join算子相当于内连接,将两个RDD中的key相同的数据匹配,如果key匹配不上,那么数据不关联
        val newRDD: RDD[(Int, (String, Int))] = rdd.join(rdd2)
        newRDD.collect().foreach(println)
        sc.stop()
    }
---------------------------------------------------------------------------
输出结果
(1,(a,3))
(1,(a,2))
(1,(b,3))
(1,(b,2))
(2,(c,5))
(3,(d,3))
  • 左外连接 leftOuterJoin(对于没有匹配上的,会用空值进行匹配)
	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "c"), (1, "b"), (3, "d"), (4, "e")))
        val rdd2: RDD[(Int, Int)] = sc.makeRDD(List((1, 3), (2, 5), (1, 2), (3, 3)))

        val newRDD: RDD[(Int, (String, Option[Int]))] = rdd.leftOuterJoin(rdd2)
        newRDD.collect().foreach(println)
        sc.stop()
    }
-----------------------------------------------------------------------
(1,(a,Some(3)))
(1,(a,Some(2)))
(1,(b,Some(3)))
(1,(b,Some(2)))
(2,(c,Some(5)))
(3,(d,Some(3)))
(4,(e,None))

11.cogroup()

类似于全连接,但是在同一个RDD中对key聚合
操作两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。

举例

  • RDD1(List(1,“a”), (1,“b”))
  • RDD2 (List(1, “c”),(1,“d”))
  • cogruop之后,结果为(1,[“a”,“b”],[“c”,“d”])

代码示例

	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "c"), (1, "b"), (3, "d"), (4, "e")))
        val rdd2: RDD[(Int, Int)] = sc.makeRDD(List((1, 3), (2, 5), (1, 2), (3, 3)))

        // cogroup
        val newRDD: RDD[(Int, (Iterable[String], Iterable[Int]))] = rdd.cogroup(rdd2)
        newRDD.collect().foreach(println)
        sc.stop()
    }
---------------------------------------------------------------
输出结果
(1,(CompactBuffer(a, b),CompactBuffer(3, 2)))
(2,(CompactBuffer(c),CompactBuffer(5)))
(3,(CompactBuffer(d),CompactBuffer(3)))
(4,(CompactBuffer(e),CompactBuffer()))

2.5Action行动算子

1.reduce()

def reduce(f: (T, T) => T): T

聚合

f函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据

代码示例

	 def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
        // reduce
        val res: Int = rdd.reduce(_ + _)
        println(res)
        sc.stop()
    }
-----------------------------------------------------
输出结果
10

2.collect()

def collect(): Array[T] 

以数组Array的形式返回数据集的所有元素

	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

        
        rdd.foreach(println)
        println("-------------------------")

        val ints: Array[Int] = rdd.collect()
        ints.foreach(println)

        sc.stop()
    }
----------------------------------------------------
输出结果
1
3
2
4
-------------------------
1
2
3
4

注意:虽然用rdd直接调用foreach进行输出和先调用collect再进行输出,结果都可以出来,但是调用collect之后,会把数据先收集到driver端,数据是全部存到一个数组当中;而不调用直接输出时,不同分区的数据是在不同的Executor并行输出,我们会看到没调用collect进行输出的话,元素的顺序可能会跟原顺序不一样

3.count()

计数,返回RDD中元素的个数

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

代码示例

	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
        val res: Long = rdd.count()
        println(res)
        sc.stop()
    }
---------------
输出结果
4

4.first()

返回RDD中的第一个元素

def first(): T

代码示例

	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

        // first 返回RDD中的第一个元素
        val res: Int = rdd.first()
        println(res)
        sc.stop()
    }
-------------------
输出结果
1

5.take()

take算子可以返回由RDD前n个元素组成的数组

def take(num: Int): Array[T]

代码示例

	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
        // take 返回rdd前n个元素组成的数组
        val res: Array[Int] = rdd.take(3)
        println(res.mkString(","))
        sc.stop()
    }
-----------------------------------------------
输出结果
1,2,3

6.takeOrdered()

返回RDD排序后前n个元素组成的数组

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

代码示例

 	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd: RDD[Int] = sc.makeRDD(List(3, 1, 2, 5, 4), 2)

        // takeOrdered(n) 获取rdd排序后前n个元素 
        val ans: Array[Int] = rdd.takeOrdered(3)
        println(ans.mkString(","))
        sc.stop()
    }
----------------------------------------------
输出结果
1,2,3

7.aggregate()

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
  • 参数解释
    • 第一个参数是设置的初始值,这个初始值在分区内和分区间都会用到
    • 第二个参数是分区内的操作逻辑
    • 第三个参数是分区间的操作逻辑

在进行分区内的逻辑时,每个分区会通过分区内逻辑和初始值进行聚合(例如:初始值为1,分区计算逻辑为相加,那么分区内的元素聚合时,最后的结果还要加一次初始值1),然后分区间也会通过分区间逻辑和初始值进行操作。

代码示例

	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8)
        rdd.mapPartitionsWithIndex(
            (index, datas) => { 
                println("分区号:" + index + "-> 分区数据:" + datas.mkString(","))
                datas
            }
        ).collect()
        // agreegate
        val res: Int = rdd.aggregate(1)(_ + _, _ + _)
        println(res)
        sc.stop()
    }
---------------------------------------------------
输出结果
分区号:4-> 分区数据:
分区号:2-> 分区数据:
分区号:1-> 分区数据:1
分区号:5-> 分区数据:3
分区号:7-> 分区数据:4
分区号:0-> 分区数据:
分区号:6-> 分区数据:
分区号:3-> 分区数据:2
19

这里有点不太好理解,我解释一下代码,代码首先是一个集合,数据在八个分区中,由输出结果可看出,1,3,5,7号分区中有数据,分区里的逻辑是相加,相加时每个分区进行分区间的逻辑操作(相加)时,分区里元素相加的结果还要加上初始值1

然后分区间的逻辑也是相加,分区间相加时也会加上初始值

总共8个分区,每个分区都要加一次初始值1,总共要加8,所有分区里的原始数据相加总共是10,那么现在总共是18。分区间在进行相加操作时,还要加一次初始值,即为结果19。

读者可以通过修改rdd分区数以及初始值来体会这个算子。

8.fold()

def fold(zeroValue: T)(op: (T, T) => T): T

fold 是aggregate的简化,分区内和分区间计算规则相同

	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 3)
        rdd.mapPartitionsWithIndex(
            (index, datas) => { 
                println("分区号:" + index + "-> 分区数据:" + datas.mkString(","))
                datas
            }
        ).collect()

        val res: Int = rdd.fold(10)(_ + _)
        println(res)
        sc.stop()
    }
---------------------------------------------------------------------------
输出结果
分区号:1-> 分区数据:2
分区号:2-> 分区数据:3,4
分区号:0-> 分区数据:1
50

初始值为10,3个分区,所以要另外加上30,分区间操作还要加上初始值,那么总共要另外加40,40再加上原始的总值10,结果为50

9.countByKey()

统计每种key的个数

def countByKey(): Map[K, Long]
  • 参数解释
    • 第一个参数代表出现的key
    • 第二个参数为此key出现的次数
	def main(args: Array[String]): Unit = { 
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")))
        val res: collection.Map[Int, Long] = rdd.countByKey()
        println(res)
        sc.stop()
    }
----------------------------------------------------------------------------------------
输出结果
Map(1 -> 2, 2 -> 1, 3 -> 2)

10.save算子

saveAsTextFile(path)

  • 保存成Text文件
    • 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统
    • 对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

**saveAsSequenceFile(path) **

  • 保存成Sequencefile文件
    • 将数据集中的元素以Hadoop Sequencefile的格式保存到指定的目录下
    • 可以使HDFS或者其他Hadoop支持的文件系统。

saveAsObjectFile(path)

  • 序列化成对象保存到文件
    • 将RDD中的元素序列化成对象,存储到文件中。
		// save 相关算子
        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 1)

        // 保存为文本文件,指定保存路径
        rdd.saveAsTextFile("D:\\IDEA_code\\bigData\\Spark_code\\spark\\output")

        // 保存为序列化文件
        rdd.saveAsObjectFile("D:\\IDEA_code\\bigData\\Spark_code\\spark\\output1")

        // 保存为SequenceFile (只支持KV类型的RDD)
        rdd.map((_, 1)).saveAsSequenceFile("D:\\IDEA_code\\bigData\\Spark_code\\spark\\output2")

11.foreach()

遍历RDD中每一个元素,前面的代码大多数地方都用过这个算子了,这里就不再赘述。

 
打赏
 本文转载自:网络 
所有权利归属于原作者,如文章来源标示错误或侵犯了您的权利请联系微信13520258486
更多>最近资讯中心
更多>最新资讯中心
0相关评论

推荐图文
推荐资讯中心
点击排行
最新信息
新手指南
采购商服务
供应商服务
交易安全
关注我们
手机网站:
新浪微博:
微信关注:

13520258486

周一至周五 9:00-18:00
(其他时间联系在线客服)

24小时在线客服