【Spark实战】日志分析(九):数据清洗作业运行到YARN上
程序员文章站
2024-02-22 13:21:17
...
1、打包
修改代码:
package com.kinglone.log
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* 使用Spark完成我们的数据清洗操作:运行在YARN之上
*/
object SparkStatCleanJobYARN {
def main(args: Array[String]) {
if(args.length !=2) {
println("Usage: SparkStatCleanJobYARN <inputPath> <outputPath>")
System.exit(1)
}
val Array(inputPath, outputPath) = args
val spark = SparkSession.builder().getOrCreate()
val accessRDD = spark.sparkContext.textFile(inputPath)
//RDD ==> DF
val accessDF = spark.createDataFrame(accessRDD.map(x => AccessConvertUtil.parseLog(x)),
AccessConvertUtil.struct)
accessDF.coalesce(1).write.format("parquet").mode(SaveMode.Overwrite)
.partitionBy("day").save(outputPath)
spark.stop
}
}
修改pom.xml文件:在pom文件中新增:
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
打包:运行 mvn assembly:assembly 生成包:cleanJobYARN-1.0-SNAPSHOT-jar-with-dependencies.jar
2、上传文件
a. 将cleanJobYARN-1.0-SNAPSHOT-jar-with-dependencies.jar上传到/opt/script目录下
b. 将access.log,ipDatabase.csv,ipRegion.xlsx上传到/opt/bigdata目录下
c. 在hadoop上新建 /imooc/input/目录,并将access.log上传到该目录
在hadoop上新建/imooc/ipfile/目录,并将ipDatabase.csv,ipRegion.xlsx上传到该目录
hadoop fs -mkdir -p /imooc/input/
hadoop fs -mkdir -p /imooc/ipfile/
hadoop fs -put access.log /imooc/input
hadoop fs -put ipDatabase.csv /imooc/ipfile
hadoop fs -put ipRegion.xlsx /imooc/ipfile
hadoop fs -ls -R /imooc/
3、运行hadoop,主要是运行hdfs
cd /opt/hadoop/sbin
./start-all.sh
4、提交spark任务
./spark-submit --class com.kinglone.log.SparkStatCleanJobYARN --name SparkStatCleanJobYARN --master yarn
--files hdfs://hadoop01:9000/imooc/ipfile/ipDatabase.csv,hdfs://hadoop01:9000/imooc/ipfile/ipRegion.xlsx -- 加载文件
/opt/script/cleanJobYARN.jar --本地要执行的jar包
hdfs://hadoop01:9000/imooc/input/* --输入文件在/imooc/input/路径下的所有文件
hdfs://hadoop01:9000/imooc/clean -- 输出文件
任务提交后查看文件:
5、生成后,启动本地模式查看结果
./spark-shell --master local[2] --jars /opt/mysql-connector-java-5.1.22-bin.jar
spark.read.format("parquet").load("hdfs://hadoop01:9000/imooc/clean/day=20170511/part-00000-8145a47b-2267-4a6d-9d65-37a3b5a5848a.c000.snappy.parquet").show(false)