spark求最受欢迎的老师的问题
程序员文章站
2022-05-26 17:53:51
...
文件内容:
http://bigdata.edu360.cn/zhangsan
http://bigdata.edu360.cn/zhangsan
http://bigdata.edu360.cn/lisi
http://bigdata.edu360.cn/lisi
1.求最受欢迎的老师,不考虑课程类别(然后类似于wordCount)
import java.net.URL
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object FavTeacher {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("Favteacher").setMaster("local[2]")
val sc = new SparkContext(conf)
val lines = sc.textFile("d:/data/teacher.log")
val word: RDD[((String, String), Int)] = lines.map(line => {
val teacher = line.substring(line.lastIndexOf("/")+1)
val url = new URL(line).getHost
val subject = url.substring(0,url.indexOf("."))
((subject,teacher),1)
})
val reduced = word.reduceByKey(_+_)
val sorted = reduced.sortBy(_._2,false)
val list = sorted.take(3)
println(list.toBuffer)
}
}
//运行结果
//ArrayBuffer(((bigdata,lisi),15), ((javaee,laoyang),9), ((javaee,zhaoliu),6))
2.求每个学科最受欢迎的老师
根据学科分组然后排序
import java.net.URL
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object FavTeacher {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("Favteacher").setMaster("local[2]")
val sc = new SparkContext(conf)
val lines = sc.textFile("d:/data/teacher.log")
val word: RDD[((String, String), Int)] = lines.map(line => {
val teacher = line.substring(line.lastIndexOf("/")+1)
val url = new URL(line).getHost
val subject = url.substring(0,url.indexOf("."))
((subject,teacher),1)
})
val reduced = word.reduceByKey(_+_)
// val sorted = reduced.sortBy(_._2,false)
//分组
val grouped = reduced.groupBy(_._1._1)
//排序 取前两名 取到的数据是scala中进行排序的
//先分组 然后在组内进行排序 这里的ComoactBuffer是迭代器,继承了序列,然后迭代器转换成List进行排序
//在某种极端情况下,_表示迭代分区的数据,证明这里是将迭代器的数据一次性的来过来后进行toList,如果数据量非常大,这里肯定会出现OOM(内存溢出)
val sorted: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(_.toList.sortBy(-_._2).take(2))
//释放资源
sc.stop()
}
}
//运行结果
// (javaee,List(((javaee,laoyang),9), ((javaee,zhaoliu),6)))
// (python,List(((python,laoli),3), ((python,laoliu),1)))
// (bigdata,List(((bigdata,lisi),15), ((bigdata,wangwu),6)))
3.求各科最受欢迎的两名老师
创建一个数组 将不同的学科放在不同的RDD中 然后排序,取值
import java.net.URL
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/**
* 根据学科取得的最受欢迎的前2名老师的排序
*/
object FavTeacher2 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val subjects = Array("javaee","bigdata","python")
val conf = new SparkConf().setAppName("Favteacher").setMaster("local[2]")
val sc = new SparkContext(conf)
val lines = sc.textFile("d:/data/teacher.log")
//处理数据
val word: RDD[((String, String), Int)] = lines.map(line => {
val teacher = line.substring(line.lastIndexOf("/")+1)
val url = new URL(line).getHost
val subject = url.substring(0,url.indexOf("."))
((subject,teacher),1)
})
//聚合
val reduced = word.reduceByKey(_+_)
// val sorted = reduced.sortBy(_._2,false)
//分组
// val grouped = reduced.groupBy(_._1._1)
//先将学科进行过滤,一个学科的数据放到一个RDD中
for(sb <- subjects){
//对所有数据进行过滤
val filtered = reduced.filter(_._1._1 == sb)
//在一个学科中进行排序(RDD排序是内存+磁盘)
val sorted = filtered.sortBy(_._2,false).take(2)
println(sorted.toBuffer)
}
}
}
//运行结果
ArrayBuffer(((javaee,laoyang),9), ((javaee,zhaoliu),6))
ArrayBuffer(((bigdata,lisi),15), ((bigdata,wangwu),6))
ArrayBuffer(((python,laoli),3), ((python,laoliu),1))
4.求各科最受欢迎的两名老师
自定义分区器 将相同科目的老师放到同一个分区
import java.net.URL
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import scala.collection.mutable
object FavTeacher3 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("Favteacher").setMaster("local[2]")
val sc = new SparkContext(conf)
val lines = sc.textFile("d:/data/teacher.log")
//处理数据
val word: RDD[((String, String), Int)] = lines.map(line => {
val teacher = line.substring(line.lastIndexOf("/")+1)
val url = new URL(line).getHost
val subject = url.substring(0,url.indexOf("."))
((subject,teacher),1)
})
//聚合
val reduced = word.reduceByKey(_+_)
//先计算学科的数量
//将所有学科的名字先在集群中统计计算,然后收集回来(计算有几个学科 创建几个分区)
val subject: Array[String] = reduced.map(_._1._1).distinct().collect()
//创建一个自定义分区器,按照学科进行分区, 相同学科的数据都shuffle到一个分区
val subjectPartitiioner = new SubjectPartitioner(subject)
//对聚合后的RDD进行自定义分区
val sbPartitioner = reduced.partitionBy(subjectPartitiioner)
//重新分区后,在每个分区中进行排序
val sorted =
sbPartitioner.mapPartitions(_.toList.sortBy(- _._2).iterator)
sorted.saveAsTextFile("d:/data/out/teacher")
}
}
//自定义分区器
class SubjectPartitioner(subjects:Array[String]) extends Partitioner{
//在new的时候执行,在构造器中执行
//String是分区(学科),Int 是学科的位置
val rules = new mutable.HashMap[String,Int]()
var index = 0
//初始化一个规则
for(sb <- subjects){
rules += ((sb,index))
index += 1
}
//有几个学科返回几个区
//返回分区的数量
override def numPartitions: Int = subjects.length
//根据传入的key,计算返回分区的编号
//定义一个 计算规则
override def getPartition(key: Any): Int = {
//key是一个元组(学科,老师) 将key强制转换成元组
val tuple = key.asInstanceOf[(String,String)]
val subject = tuple._1
rules(subject)
}
上一篇: Ubuntu安装Vim编辑器
下一篇: greenDAO__最受欢迎的数据库框架