欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark小练习——求各科老师最受欢迎的TopN

程序员文章站 2022-06-14 08:10:11
...

Spark小练习——求各科老师最受欢迎的TopN

 【注】本文参考自小牛学堂学习视频

Spark小练习——求各科老师最受欢迎的TopN

数据格式http://bigdata.edu360.cn/laozhang

1.数据切分

val func=(line:String)=>{

  val index=line.lastIndexOf("/")

  val teacher=line.substring(index+1)

  val httpHost=line.substring(0,index)

  val subject=new URL(httpHost).getHost.split("[.]")(0)

  // (subject,teacher)

    //(teacher,1)

}

2.逻辑计算

2.1求所有科目中最受欢迎的老师topN

//拿到数据源

val lines=sc.textFile(path)

val teacherAndOne=lines.map(func)

val reduced=teacherAndOne.reduceByKey(_+_)

val sorted=reduced.sortBy(_._2,false)

val result=sorted.top(topN))

2.2求各科最受欢迎老师的topN

 

(1)使用Scala中的sortBy方法(适用于数据量较小的情况)

val lines=sc.textFile(path)

val subjectAndTeacher=lines.map(func)

val maped=subjectAndTeacher.map((_,1))

val reduced=maped.reduceByKey(_+_)

//按学科分组,得到的key是学科,value是学科对应的老师数据的迭代器

val grouped: RDD[(String, Iterable[((String, String), Int)])]=reduced.groupBy(_._1._1)

//将每一个组拿出来进行操作

//为什么可以调用scala的sortBy方法

//因为一个学科的数据已经在一台机器上的一个集合里了(缺点:在内存中序,

//如果数据量大的话,可能会出问题)

val sorted=grouped.mapValues(_.toList.sortBy(_._2).reverse.take(topN))

//数据量较小,所以就直接收集了,也可以把它存储到文件中

val result=sorted.collect()

 

(2)使用RDD中的sortBy方法(适用于数据量较大的情况)

val lines=sc.textFile(path)

val subjectAndTeacher=lines.map(func)

val subjects=subjectAndTeacher.keys.distinct()

val maped=subjectAndTeacher.map((_,1))

val reduced=maped.reduceByKey(_+_)

for(sb <- subjects){

  val filter=reduce.filter(_._1.equals(sb))

  //现在调用的是RDD上的sortBy方法,可在内存与磁盘中

  //take方法是先在Executor中取好前几个再通过网络发送到Driver,是个      //Action

  val r=filter.sortBy(_._2,false).take(topN)

  //然后把r收集或存储起来

}

(3)自定义分区器,以学科来分区      

  i.分区器SubjectPartitioner

class SubjectPartitioner(subjects:Array[String]) extends Partitioner{

  //相当于主构造器(new的时候会执行一次)

  //用于存放规则的一个map

  val rules=new mutable.HashMap[String,Int]()

  var i=0

  for(sb <- subjects){

    rules(sb)=i

    i=i+1

  }

  //返回分区的数量(下一个RDD有多少分区)

  override def numPartitions:Int =subjects.length

    //根据传入的key计算分区标号

  override def getPartition(key: Any):Int ={

    //key是一个元组(学科,老师)

    //sb:学科

    val sb=key.asInstanceOf[(String,String)]._1

    //根据规则计算分区编号

    rules(sb)

  }

}

    ii.逻辑

val lines=sc.textFile(path)

val subjectAndTeacher=lines.map(func)

val subjects=subjectAndTeacher.keys.distinct()

val maped=subjectAndTeacher.map((_,1))

//聚合

//第一次shuffle

val reduced=maped.reduceByKey(_+_)

//自定义分区器,按照指定的分区器来进行分区

//partitionBy按照指定的分区规则来分区

//第二次shuffle

val partitioned: RDD[((String, String), Int)] =reduce.partitionBy(new SubjectPartitioner(subjects))



//一次操作一个分区

val sorted=partitioned.mapPartitions(it => {

//将迭代器转换成List然后排序再转换成迭代器返回

  it.toList.sortBy(_._2).reverse.take(topN).iterator

 //缺点:又是加载到内存再排序

})

(4)减少shuffle次数

val lines=sc.textFile(path)

val subjectAndTeacher=lines.map(func)

val subjects=subjectAndTeacher.keys.distinct()

val maped=subjectAndTeacher.map((_,1))

//分区器

val sbPartitioner=new SubjectPartitioner(subjects)

//聚合

//第一次shuffle

val reduced=maped.reduceByKey(sbPartitioner,_+_)



//一次操作一个分区

val sorted=reduced.mapPartitions(it => {

//将迭代器转换成List然后排序再转换成迭代器返回

  it.toList.sortBy(_._2).reverse.take(topN).iterator

//缺点:又是加载到内存再排序

//优化:即排序,又不全部加载到内存

//考虑用一个定长TreeSet来装从迭代器中取出的数据,然后排序留下topN,后面的再装入新的数据,再排序,重复操作直到迭代完该学科的所有数据

})

 

相关标签: Spark