欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark Structured Streaming 实时解析mr 任务

程序员文章站 2022-07-14 21:57:59
...

     Structured Streaming 是spark2.x后引入的实时计算框架。spark一直以来都是以微批来处理数据的,一直做不到毫秒级的实时处理,structured streaming最开始引入时也是用的微批处理数据,spark2.3后面引入了一个新的处理模式真正实现了实时计算,可以实现毫秒级的处理速度。structured streaming 实现了exactly-once ,这个一直被人诟病和拿来和flink比较。spark 和 flink 都是非常优秀的流计算处理框架,两者都在互相学习。flink 被阿里接手后,在国内现在变得非常火。

      一直想写个实时解析mr任务信息的程序,最近抽时间拿 structured streaming 练手用scala实现了个demo,其中解析历史任务的代码是从dr.elephant源码中提取出来修改的。

    val spark = SparkSession.builder.appName("analysis-job").getOrCreate()
    val  df =  spark.readStream.format("kafka")
           .option("kafka.bootstrap.servers", "xxxxx:9092")
           .option("subscribe", "job1")
           .option("startingOffsets", "earliest")
           .load()
    import spark.implicits._
//    implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[MrJobData]
    val ds = df.selectExpr( "CAST(value AS STRING)").as[String].map(line => {
      val conf = new  Configuration()
      UserGroupInformation.setConfiguration(conf);
      UserGroupInformation.loginUserFromKeytab("hadoop","/etc/security/keytabs/hadoop.keytab");
      val fs = FileSystem.get(conf);
      var job = new MrJobData()
     try{
      val splits  = line.split(",")
      job = AnalysisMrJob.parseJob(fs,splits(0).toLong,splits(1))
      println("=================="+job.sql+"===================")
     }catch {
       case ex :Exception => logger.error("解析文件异常",ex)
     }finally {
        fs.close()
     }
      JobSchema(job.jobId,job.user,job.queue,job.state,job.mtotals,job.rtotals,job.startTime,job.submitTime,job.finishTime,job.sql,job.sentryUser)
    }).filter( a =>{ !"".equals(a.user) })
    println("================ ===================================================")
    ds.printSchema()
    println("===================================================================")

//    value.createOrReplaceTempView("test")
//    val frame = spark.sql("select * from test")

    val query =  ds.writeStream.format("parquet")
      .option("checkpointLocation", "/user/hadoop/checkpoint/dir")
      .option("path", "/user/hive/warehouse/st")
      .partitionBy("startTime")
      .start()

       query.awaitTermination()

structured streaming File sink 输出支持分区,这对于分区表非常有用。