【Spark实战】慕课网日志分析(五):将数据统计和入库的作业提交到YARN上运行
程序员文章站
2024-02-22 12:20:52
...
笔记目录:
【Spark实战】慕课网日志分析(一):数据初步清洗
【Spark实战】慕课网日志分析(二):数据二次清洗之日志解析
【Spark实战】慕课网日志分析(三):清理后数据的存储、统计和入库
【Spark实战】慕课网日志分析(四):将数据清洗的作业提交到YARN上运行
【Spark实战】慕课网日志分析(五):将数据统计和入库的作业提交到YARN上运行
一、代码实现YARN上的统计作业
新建文件TopNStatJobYARN.scala
package com.imooc.log
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable.ListBuffer
/**
* TopN统计Spark作业:运行在YARN之上
*/
object TopNStatJobYARN {
def main(args: Array[String]) {
if(args.length !=2) {
println("Usage: TopNStatJobYARN <inputPath> <day>")
System.exit(1)
}
val Array(inputPath, day) = args
val spark = SparkSession.builder()
.config("spark.sql.sources.partitionColumnTypeInference.enabled","false")
.getOrCreate()
val accessDF = spark.read.format("parquet").load(inputPath)
StatDAO.deleteData(day)
//最受欢迎的TopN课程
videoAccessTopNStat(spark, accessDF, day)
//按照地市进行统计TopN课程
cityAccessTopNStat(spark, accessDF, day)
//按照流量进行统计
videoTrafficsTopNStat(spark, accessDF, day)
spark.stop()
}
/**
* 按照流量进行统计
*/
def videoTrafficsTopNStat(spark: SparkSession, accessDF:DataFrame, day:String): Unit = {
import spark.implicits._
val cityAccessTopNDF = accessDF.filter($"day" === day && $"cmsType" === "video")
.groupBy("day","cmsId").agg(sum("traffic").as("traffics"))
.orderBy($"traffics".desc)
//.show(false)
/**
* 将统计结果写入到MySQL中
*/
try {
cityAccessTopNDF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayVideoTrafficsStat]
partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val traffics = info.getAs[Long]("traffics")
list.append(DayVideoTrafficsStat(day, cmsId,traffics))
})
StatDAO.insertDayVideoTrafficsAccessTopN(list)
})
} catch {
case e:Exception => e.printStackTrace()
}
}
/**
* 按照地市进行统计TopN课程
*/
def cityAccessTopNStat(spark: SparkSession, accessDF:DataFrame, day:String): Unit = {
import spark.implicits._
val cityAccessTopNDF = accessDF.filter($"day" === day && $"cmsType" === "video")
.groupBy("day","city","cmsId")
.agg(count("cmsId").as("times"))
//cityAccessTopNDF.show(false)
//Window函数在Spark SQL的使用
val top3DF = cityAccessTopNDF.select(
cityAccessTopNDF("day"),
cityAccessTopNDF("city"),
cityAccessTopNDF("cmsId"),
cityAccessTopNDF("times"),
row_number().over(Window.partitionBy(cityAccessTopNDF("city"))
.orderBy(cityAccessTopNDF("times").desc)
).as("times_rank")
).filter("times_rank <=3") //.show(false) //Top3
/**
* 将统计结果写入到MySQL中
*/
try {
top3DF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayCityVideoAccessStat]
partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val city = info.getAs[String]("city")
val times = info.getAs[Long]("times")
val timesRank = info.getAs[Int]("times_rank")
list.append(DayCityVideoAccessStat(day, cmsId, city, times, timesRank))
})
StatDAO.insertDayCityVideoAccessTopN(list)
})
} catch {
case e:Exception => e.printStackTrace()
}
}
/**
* 最受欢迎的TopN课程
*/
def videoAccessTopNStat(spark: SparkSession, accessDF:DataFrame, day:String): Unit = {
/**
* 使用DataFrame的方式进行统计
*/
import spark.implicits._
val videoAccessTopNDF = accessDF.filter($"day" === day && $"cmsType" === "video")
.groupBy("day","cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)
videoAccessTopNDF.show(false)
/**
* 使用SQL的方式进行统计
*/
// accessDF.createOrReplaceTempView("access_logs")
// val videoAccessTopNDF = spark.sql("select day,cmsId, count(1) as times from access_logs " +
// "where day='20170511' and cmsType='video' " +
// "group by day,cmsId order by times desc")
//
// videoAccessTopNDF.show(false)
/**
* 将统计结果写入到MySQL中
*/
try {
videoAccessTopNDF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayVideoAccessStat]
partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val times = info.getAs[Long]("times")
list.append(DayVideoAccessStat(day, cmsId, times))
})
StatDAO.insertDayVideoAccessTopN(list)
})
} catch {
case e:Exception => e.printStackTrace()
}
}
}
二、maven中打包
1.执行mvn clean
,以免受到之前打包项目的干扰。
2.执行mvn assembly:assembly
打包。
三、作业提交到YARN
1.虚拟机中,使用mysql -uroot -proot
进入MySQL命令行,并建表,建表代码:
create table day_video_access_topn_stat(
day varchar(8) not null,
cms_id bigint(10) not null,
times bigint(10) not null,
primary key (day, cms_id)
);
create table day_video_city_access_topn_stat(
day varchar(8) not null,
cms_id bigint(10) not null,
city varchar(20) not null,
times bigint(10) not null,
times_rank int not null,
primary key (day, cms_id,city)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
create table day_video_traffics_topn_stat(
day varchar(8) not null,
cms_id bigint(10) not null,
traffics bigint(20) not null,
primary key(day, cms_id));
2.将jar包拷贝到虚拟机hadoop用户的~/lib
路径下
3.使用spark-submit
提交作业
注意:
①.跟数据清洗的作业相比,删掉--files /home/hadoop/lib/ipDatabase.csv,/home/hadoop/lib/ipRegion.xlsx \
,因为这两个文件在数据清洗的时候才需要,统计作业并不需要。
②.输入是清洗后的数据文件路径,输出是20170511
执行命令:
/home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/bin/spark-submit \
--class com.imooc.log.TopNStatJobYARN \
--name TopNStatJobYARN \
--master yarn \
--executor-memory 1G \
--num-executors 1 \
/home/hadoop/lib/sql-1.0-jar-with-dependencies.jar \
hdfs://hadoop001:8020/imooc/clean 20170511
MySQL结果查询:
至此,我们已经成功将数据统计和入库的作业提交到YARN上运行。
上一篇: STM32关于USART3 DMA使用