大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语—
不温不火
,本意是希望自己性情温和
。作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己所犯的错误希望能够帮助到很多和自己一样处于起步阶段的萌新。但由于水平有限,博客中难免会有一些错误出现,有纰漏之处恳请各位大佬不吝赐教!暂时只有csdn这一个平台,博客主页:https://buwenbuhuo.blog.csdn.net/
项目代码博主已经打包到Github需要的可以自行下载:https://github.com/459804692/spark0729
目录
- 一. 需求分析
- 二. 思路
- 三. 项目实现
- 1. bean类
- 2. 具体实现
一. 需求分析
对于排名前 10 的品类,分别获取每个品类点击次数排名前 10 的 sessionId。(注意: 这里我们只关注点击次数, 不关心下单和支付次数)
这个就是说,对于 top10 的品类,每一个都要获取对它点击次数排名前 10 的 sessionId。
这个功能,可以让我们看到,对某个用户群体最感兴趣的品类,各个品类最感兴趣最典型的用户的 session 的行为。
二. 思路
- 过滤出来 category Top10的日志
- 需要用到需求1的结果, 然后只需要得到categoryId就可以了
- 转换结果为 RDD[(categoryId, sessionId), 1] 然后统计数量 => RDD[(categoryId, sessionId), count]
- 统计每个品类 top10. => RDD[categoryId, (sessionId, count)] => RDD[categoryId, Iterable[(sessionId, count)]]
- 对每个 Iterable[(sessionId, count)]进行排序, 并取每个Iterable的前10
- 把数据封装到 CategorySession 中
三. 项目实现
1. bean类
- 1. 创建SessionInfo
case class SessionInfo(sessionId:String,
count: Long) extends Ordered[SessionInfo]{
// 按照降序排列
// else if (this.count == that.count) 0 这个不能加,否则会去重
override def compare(that: SessionInfo): Int =
if (this.count >that.count) -1
else 1
2. 具体实现
- 1. 主类实现
package com.buwenbuhuo.spark.core.project.app
import com.buwenbuhuo.spark.core.project.bean.{CategoryCountInfo, UserVisitAction}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object ProjectApp {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("ProjectAPP").setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
// 把数据从文件读出来
val sourceRDD: RDD[String] = sc.textFile("D:/user_visit_action.txt")
// 把数据封装好(封装到样例类中)
// sourceRDD.collect.foreach(println)
val userVisitActionRDD: RDD[UserVisitAction] = sourceRDD.map(line => {
val fields: Array[String] = line.split("_")
UserVisitAction(
fields(0),
fields(1).toLong,
fields(2),
fields(3).toLong,
fields(4),
fields(5),
fields(6).toLong,
fields(7).toLong,
fields(8),
fields(9),
fields(10),
fields(11),
fields(12).toLong)
})
// 需求1:
val categoryTop10: List[CategoryCountInfo] = CategoryTopApp.calcCatgoryTop10(sc , userVisitActionRDD)
// 需求2:top10品类的top10session
CategorySessionTopApp.statCategorySessionTop10(sc,categoryTop10,userVisitActionRDD)
// 关闭项目(sc)
sc.stop()
}
}
- 2. 解决方案1(原始方法,没任何优化)
package com.buwenbuhuo.spark.core.project.app
import com.buwenbuhuo.spark.core.project.bean.{CategoryCountInfo, SessionInfo, UserVisitAction}
import org.apache.spark.{Partitioner, SparkContext}
import org.apache.spark.rdd.RDD
import scala.collection.mutable
object CategorySessionTopApp {
def statCategorySessionTop10(sc: SparkContext,categoryTop10: List[CategoryCountInfo],userVisitActionRDD: RDD[UserVisitAction]): Unit ={
// 1. 过滤出来只包含 top10 品类id的那些点击记录
// 1.1 先把top10品类id拿出来,转成Long id的目的是为了和UserVisitAction Clided兼容
val cids: List[Long] = categoryTop10.map(_.categoryId.toLong)
val filteredUserVisitActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(action => cids.contains(action.click_category_id))
// 2.每个品类top10session的计算
// 2.1 先map出来需要字段
val cidSidAndOne: RDD[((Long, String), Int)] =
filteredUserVisitActionRDD.map(action =>((action.click_category_id,action.session_id),1))
// 2.2 做聚合操作 得到RDD[((cid,sid),count)]
val cidSidAndCount: RDD[((Long, String), Int)] = cidSidAndOne.reduceByKey(_ + _)
// map 出来想要的数据结构
val cidAndSidCount: RDD[(Long, (String, Int))] = cidSidAndCount.map {
case ((cid, sid), count) => (cid, (sid, count))
}
// 2.3 分组 排序取Top10
val cidAandSidCountItRDD: RDD[(Long, Iterable[(String, Int)])] = cidAndSidCount.groupByKey()
// 2.4 对每个值排序取top10
// 解法1: 最原始的写法 最不好的写法,it.toList一时爽,不过最终可能会因为内存原因而爆掉
val result = cidAandSidCountItRDD mapValues((it:Iterable[(String,Int)]) =>{
// 只能使用scala排序,scala排序必须把所有数据全部加载到内存才能排。
// 如果数据量很小可以 ,数据量大就不行了
it.toList.sortBy(-_._2).take(10)
})
result.collect.foreach(println)
}
}
- 3. 解决方案2
def statCategorySessionTop10_2(sc: SparkContext,categoryTop10: List[CategoryCountInfo],userVisitActionRDD: RDD[UserVisitAction]): Unit ={
// 1. 过滤出来只包含 top10 品类id的那些点击记录
// 1.1 先把top10品类id拿出来,转成Long id的目的是为了和UserVisitAction Clided兼容
val cids: List[Long] = categoryTop10.map(_.categoryId.toLong)
val filteredUserVisitActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(action => cids.contains(action.click_category_id))
// 方案2写法1:
// 方案2写法2:
// 2. 需要排10次
val temp: List[Map[Long, List[(String, Int)]]] = cids.map(f = cid => {
// 2.1 先过滤出来点击id是cid的那些记录
val cidUserVisitActionRDD: RDD[UserVisitAction] = filteredUserVisitActionRDD.filter(_.click_category_id == cid)
// 2.2 聚合
val r: Map[Long, List[(String, Int)]] = cidUserVisitActionRDD
.map(action => ((action.click_category_id, action.session_id), 1))
.reduceByKey(_ + _)
.map {
case ((cid, sid), count) => (cid, (sid, count))
}
.sortBy(-_._2._2)
.take(10)
.groupBy(_._1)
.map {
case (cid, arr) => (cid, arr.map(_._2).toList)
}
r
})
val result: List[(Long, List[(String, Int)])] = temp.flatMap(map => map)
result.foreach(println)
}
- 4. 解决方案3
def statCategorySessionTop10_3(sc: SparkContext,categoryTop10: List[CategoryCountInfo],userVisitActionRDD: RDD[UserVisitAction]): Unit ={
// 1. 过滤出来只包含 top10 品类id的那些点击记录
// 1.1 先把top10品类id拿出来,转成Long id的目的是为了和UserVisitAction Clided兼容
val cids: List[Long] = categoryTop10.map(_.categoryId.toLong)
val filteredUserVisitActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(action => cids.contains(action.click_category_id))
// 2.每个品类top10session的计算
// 2.1 先map出来需要字段
val cidSidAndOne: RDD[((Long, String), Int)] =
filteredUserVisitActionRDD.map(action =>((action.click_category_id,action.session_id),1))
// 2.2 做聚合操作 得到RDD[((cid,sid),count)]
val cidSidAndCount: RDD[((Long, String), Int)] = cidSidAndOne.reduceByKey(_ + _)
// map 出来想要的数据结构
val cidAndSidCount: RDD[(Long, (String, Int))] = cidSidAndCount.map {
case ((cid, sid), count) => (cid, (sid, count))
}
// 2.3 分组 排序取Top10
val cidAandSidCountItRDD: RDD[(Long, Iterable[(String, Int)])] = cidAndSidCount.groupByKey()
// 2.4 对每个值排序取top10
val result = cidAandSidCountItRDD mapValues((it:Iterable[(String,Int)]) =>{
// 不要把Iterable直接转成list再排序
var set = mutable.TreeSet[SessionInfo]()
it.foreach{
case (sid,count) =>
val info: SessionInfo = SessionInfo(sid, count)
set += info
if(set.size > 10) set = set.take(10)
}
set.toList
})
// 起1 job
result.collect.foreach(println)
Thread.sleep(1000000)
}
- 5. 解决方案4
def statCategorySessionTop10_4(sc: SparkContext,categoryTop10: List[CategoryCountInfo],userVisitActionRDD: RDD[UserVisitAction]): Unit ={
// 1. 过滤出来只包含 top10 品类id的那些点击记录
// 1.1 先把top10品类id拿出来,转成Long id的目的是为了和UserVisitAction Clided兼容
val cids: List[Long] = categoryTop10.map(_.categoryId.toLong)
val filteredUserVisitActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(action => cids.contains(action.click_category_id))
// 2.每个品类top10session的计算
// 2.1 先map出来需要字段
val cidSidAndOne: RDD[((Long, String), Int)] =
filteredUserVisitActionRDD.map(action =>((action.click_category_id,action.session_id),1))
// 2.2 做聚合操作 得到RDD[((cid,sid),count)]
val cidSidAndCount: RDD[((Long, String), Int)] =
cidSidAndOne.reduceByKey(new CategorySessionPartitioner(cids),_ + _)
// 2.3 cidSidAndCount 执行mapPartitions
val result: RDD[(Long, List[SessionInfo])] = cidSidAndCount.mapPartitions(it => {
// 不要把Iterable直接转成list再排序
var set = mutable.TreeSet[SessionInfo]()
var categoryId = -1L
it.foreach {
case ((cid, sid), count) =>
categoryId = cid
val info: SessionInfo = SessionInfo(sid, count)
set += info
if (set.size > 10) set = set.take(10)
}
// set.map((categoryId, _)).toIterator
Iterator((categoryId, set.toList))
})
result.collect.foreach(println)
Thread.sleep(1000000)
}
}
class CategorySessionPartitioner(cids:List[Long]) extends Partitioner {
private val cidIndexMap: Map[Long, Int] = cids.zipWithIndex.toMap
// 分区和品类id数量保持一致,可以保证一个的分区只有一个cid
override def numPartitions: Int = 10
// (Long,String) => (cid,sessionId)
override def getPartition(key: Any): Int = key match {
// 使用这个cid在数组中的下标作为分区的索引非常合适
case (cid:Long,_) => cidIndexMap(cid)
}
}
综合上述四种方法 最后一种方法是最完美的
本次的分享就到这里了,
好书不厌读百回,熟读课思子自知。而我想要成为全场最靓的仔,就必须坚持通过学习来获取更多知识,用知识改变命运,用博客见证成长,用行动证明我在努力。
如果我的博客对你有帮助、如果你喜欢我的博客内容,请“点赞” “评论”“收藏”
一键三连哦!听说点赞的人运气不会太差,每一天都会元气满满呦!如果实在要白嫖的话,那祝你开心每一天,欢迎常来我博客看看。
码字不易,大家的支持就是我坚持下去的动力。点赞后不要忘了关注
我哦!