Spark Structured Streaming 实时解析mr 任务
程序员文章站
2022-07-14 21:57:59
...
Structured Streaming 是spark2.x后引入的实时计算框架。spark一直以来都是以微批来处理数据的,一直做不到毫秒级的实时处理,structured streaming最开始引入时也是用的微批处理数据,spark2.3后面引入了一个新的处理模式真正实现了实时计算,可以实现毫秒级的处理速度。structured streaming 实现了exactly-once ,这个一直被人诟病和拿来和flink比较。spark 和 flink 都是非常优秀的流计算处理框架,两者都在互相学习。flink 被阿里接手后,在国内现在变得非常火。
一直想写个实时解析mr任务信息的程序,最近抽时间拿 structured streaming 练手用scala实现了个demo,其中解析历史任务的代码是从dr.elephant源码中提取出来修改的。
val spark = SparkSession.builder.appName("analysis-job").getOrCreate()
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "xxxxx:9092")
.option("subscribe", "job1")
.option("startingOffsets", "earliest")
.load()
import spark.implicits._
// implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[MrJobData]
val ds = df.selectExpr( "CAST(value AS STRING)").as[String].map(line => {
val conf = new Configuration()
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab("hadoop","/etc/security/keytabs/hadoop.keytab");
val fs = FileSystem.get(conf);
var job = new MrJobData()
try{
val splits = line.split(",")
job = AnalysisMrJob.parseJob(fs,splits(0).toLong,splits(1))
println("=================="+job.sql+"===================")
}catch {
case ex :Exception => logger.error("解析文件异常",ex)
}finally {
fs.close()
}
JobSchema(job.jobId,job.user,job.queue,job.state,job.mtotals,job.rtotals,job.startTime,job.submitTime,job.finishTime,job.sql,job.sentryUser)
}).filter( a =>{ !"".equals(a.user) })
println("================ ===================================================")
ds.printSchema()
println("===================================================================")
// value.createOrReplaceTempView("test")
// val frame = spark.sql("select * from test")
val query = ds.writeStream.format("parquet")
.option("checkpointLocation", "/user/hadoop/checkpoint/dir")
.option("path", "/user/hive/warehouse/st")
.partitionBy("startTime")
.start()
query.awaitTermination()
structured streaming File sink 输出支持分区,这对于分区表非常有用。
上一篇: 数字图像处理实验之伽马变换
下一篇: Android 事件传递机制实测