欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spark求最受欢迎的老师的问题

程序员文章站 2022-05-26 17:53:51
...

文件内容:

http://bigdata.edu360.cn/zhangsan
http://bigdata.edu360.cn/zhangsan
http://bigdata.edu360.cn/lisi
http://bigdata.edu360.cn/lisi

1.求最受欢迎的老师,不考虑课程类别(然后类似于wordCount)

import java.net.URL

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FavTeacher {

 
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    val conf = new SparkConf().setAppName("Favteacher").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("d:/data/teacher.log")
    val word: RDD[((String, String), Int)] = lines.map(line => {
      val teacher = line.substring(line.lastIndexOf("/")+1)
      val url = new URL(line).getHost
      val subject = url.substring(0,url.indexOf("."))
      ((subject,teacher),1)
    })
    val reduced = word.reduceByKey(_+_)

    val sorted = reduced.sortBy(_._2,false)
    val list = sorted.take(3)
    println(list.toBuffer)

  }
}



//运行结果
//ArrayBuffer(((bigdata,lisi),15), ((javaee,laoyang),9), ((javaee,zhaoliu),6))

2.求每个学科最受欢迎的老师

  根据学科分组然后排序

import java.net.URL

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FavTeacher {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    val conf = new SparkConf().setAppName("Favteacher").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("d:/data/teacher.log")
    val word: RDD[((String, String), Int)] = lines.map(line => {
      val teacher = line.substring(line.lastIndexOf("/")+1)
      val url = new URL(line).getHost
      val subject = url.substring(0,url.indexOf("."))
      ((subject,teacher),1)
    })
    val reduced = word.reduceByKey(_+_)

   // val sorted = reduced.sortBy(_._2,false)
    //分组
    val grouped = reduced.groupBy(_._1._1)
    //排序 取前两名 取到的数据是scala中进行排序的
    //先分组 然后在组内进行排序 这里的ComoactBuffer是迭代器,继承了序列,然后迭代器转换成List进行排序
    //在某种极端情况下,_表示迭代分区的数据,证明这里是将迭代器的数据一次性的来过来后进行toList,如果数据量非常大,这里肯定会出现OOM(内存溢出)
    val sorted: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(_.toList.sortBy(-_._2).take(2))



    //释放资源
    sc.stop()
  }
}

//运行结果

//  (javaee,List(((javaee,laoyang),9), ((javaee,zhaoliu),6)))
//  (python,List(((python,laoli),3), ((python,laoliu),1)))
//  (bigdata,List(((bigdata,lisi),15), ((bigdata,wangwu),6)))

3.求各科最受欢迎的两名老师

创建一个数组  将不同的学科放在不同的RDD中 然后排序,取值

import java.net.URL

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
  * 根据学科取得的最受欢迎的前2名老师的排序
  */
object FavTeacher2 {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    val subjects = Array("javaee","bigdata","python")
    val conf = new SparkConf().setAppName("Favteacher").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("d:/data/teacher.log")
    //处理数据
    val word: RDD[((String, String), Int)] = lines.map(line => {
      val teacher = line.substring(line.lastIndexOf("/")+1)
      val url = new URL(line).getHost
      val subject = url.substring(0,url.indexOf("."))
      ((subject,teacher),1)
    })
    //聚合
    val reduced = word.reduceByKey(_+_)

    // val sorted = reduced.sortBy(_._2,false)
    //分组
    // val grouped = reduced.groupBy(_._1._1)
    //先将学科进行过滤,一个学科的数据放到一个RDD中
    for(sb <- subjects){
      //对所有数据进行过滤
      val filtered = reduced.filter(_._1._1 == sb)
      //在一个学科中进行排序(RDD排序是内存+磁盘)
      val sorted = filtered.sortBy(_._2,false).take(2)
      println(sorted.toBuffer)
    }

  }
}

//运行结果
ArrayBuffer(((javaee,laoyang),9), ((javaee,zhaoliu),6))
ArrayBuffer(((bigdata,lisi),15), ((bigdata,wangwu),6))
ArrayBuffer(((python,laoli),3), ((python,laoliu),1))

4.求各科最受欢迎的两名老师

  自定义分区器 将相同科目的老师放到同一个分区

import java.net.URL

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

import scala.collection.mutable

object FavTeacher3 {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

    val conf = new SparkConf().setAppName("Favteacher").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("d:/data/teacher.log")
    //处理数据
    val word: RDD[((String, String), Int)] = lines.map(line => {
      val teacher = line.substring(line.lastIndexOf("/")+1)
      val url = new URL(line).getHost
      val subject = url.substring(0,url.indexOf("."))
      ((subject,teacher),1)
    })
    //聚合
    val reduced = word.reduceByKey(_+_)

    //先计算学科的数量
    //将所有学科的名字先在集群中统计计算,然后收集回来(计算有几个学科 创建几个分区)
    val subject: Array[String] = reduced.map(_._1._1).distinct().collect()

    //创建一个自定义分区器,按照学科进行分区, 相同学科的数据都shuffle到一个分区
    val subjectPartitiioner = new SubjectPartitioner(subject)

    //对聚合后的RDD进行自定义分区
    val sbPartitioner = reduced.partitionBy(subjectPartitiioner)
    //重新分区后,在每个分区中进行排序
    val sorted =
    sbPartitioner.mapPartitions(_.toList.sortBy(- _._2).iterator)
    sorted.saveAsTextFile("d:/data/out/teacher")
  }
}

//自定义分区器
class SubjectPartitioner(subjects:Array[String]) extends Partitioner{
  //在new的时候执行,在构造器中执行
  //String是分区(学科),Int 是学科的位置
  val rules = new mutable.HashMap[String,Int]()

  var index = 0
  //初始化一个规则
  for(sb <- subjects){
    rules += ((sb,index))
    index += 1
  }
  //有几个学科返回几个区
  //返回分区的数量
  override def numPartitions: Int = subjects.length
  //根据传入的key,计算返回分区的编号
  //定义一个 计算规则
  override def getPartition(key: Any): Int = {
    //key是一个元组(学科,老师) 将key强制转换成元组
    val tuple = key.asInstanceOf[(String,String)]
    val subject = tuple._1
    rules(subject)
  }