欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【Spark实战】日志分析(九):数据清洗作业运行到YARN上

程序员文章站 2024-02-22 13:21:17
...
1、打包

修改代码:

package com.kinglone.log

import org.apache.spark.sql.{SaveMode, SparkSession}

/**
 * 使用Spark完成我们的数据清洗操作:运行在YARN之上
 */
object SparkStatCleanJobYARN {

  def main(args: Array[String]) {

    if(args.length !=2) {
      println("Usage: SparkStatCleanJobYARN <inputPath> <outputPath>")
      System.exit(1)
    }

    val Array(inputPath, outputPath) = args

    val spark = SparkSession.builder().getOrCreate()

    val accessRDD = spark.sparkContext.textFile(inputPath)

    //RDD ==> DF
    val accessDF = spark.createDataFrame(accessRDD.map(x => AccessConvertUtil.parseLog(x)),
      AccessConvertUtil.struct)

    accessDF.coalesce(1).write.format("parquet").mode(SaveMode.Overwrite)
      .partitionBy("day").save(outputPath)

    spark.stop
  }
}

修改pom.xml文件:在pom文件中新增:

  <plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
      <archive>
        <manifest>
          <mainClass></mainClass>
        </manifest>
      </archive>
      <descriptorRefs>
        <descriptorRef>jar-with-dependencies</descriptorRef>
      </descriptorRefs>
    </configuration>
  </plugin>
  </plugins>
  </build>

打包:运行 mvn assembly:assembly 生成包:cleanJobYARN-1.0-SNAPSHOT-jar-with-dependencies.jar
【Spark实战】日志分析(九):数据清洗作业运行到YARN上

2、上传文件

a. 将cleanJobYARN-1.0-SNAPSHOT-jar-with-dependencies.jar上传到/opt/script目录下
b. 将access.log,ipDatabase.csv,ipRegion.xlsx上传到/opt/bigdata目录下
c. 在hadoop上新建 /imooc/input/目录,并将access.log上传到该目录
在hadoop上新建/imooc/ipfile/目录,并将ipDatabase.csv,ipRegion.xlsx上传到该目录

hadoop fs -mkdir -p /imooc/input/
hadoop fs -mkdir -p /imooc/ipfile/
hadoop fs -put access.log /imooc/input
hadoop fs -put ipDatabase.csv /imooc/ipfile
hadoop fs -put ipRegion.xlsx /imooc/ipfile
hadoop fs -ls -R /imooc/
3、运行hadoop,主要是运行hdfs
cd /opt/hadoop/sbin
./start-all.sh
4、提交spark任务
./spark-submit --class com.kinglone.log.SparkStatCleanJobYARN --name SparkStatCleanJobYARN --master yarn 
--files hdfs://hadoop01:9000/imooc/ipfile/ipDatabase.csv,hdfs://hadoop01:9000/imooc/ipfile/ipRegion.xlsx    -- 加载文件
/opt/script/cleanJobYARN.jar     --本地要执行的jar包
hdfs://hadoop01:9000/imooc/input/*   --输入文件在/imooc/input/路径下的所有文件
hdfs://hadoop01:9000/imooc/clean   -- 输出文件

任务提交后查看文件:
【Spark实战】日志分析(九):数据清洗作业运行到YARN上

5、生成后,启动本地模式查看结果
./spark-shell --master local[2] --jars /opt/mysql-connector-java-5.1.22-bin.jar
spark.read.format("parquet").load("hdfs://hadoop01:9000/imooc/clean/day=20170511/part-00000-8145a47b-2267-4a6d-9d65-37a3b5a5848a.c000.snappy.parquet").show(false)

【Spark实战】日志分析(九):数据清洗作业运行到YARN上