一个有意思的spark代码,今天闲的蛋疼了
程序员文章站
2024-03-12 20:36:08
...
package com.wby.fans.incre
import java.util.Date
import com.wby.annotation.Workflow
import com.wby.data.common.Common.{platformFilterSQLParms, refreshTable}
import com.wby.data.common.{CodeTransform, Common}
import com.wby.spark.WorkflowTrait
import com.wby.utils.UtilDate
import org.apache.spark.sql.functions.{col, lit, udf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.StatCounter
@Workflow("incre.fansbe.test")
class Test extends WorkflowTrait {
override def formatArgsListSingleton(args: Seq[String]): List[Array[String]] = {
var list: List[Array[String]] = Nil;
list :+= Array("all");
list
}
override def processPrototype(arrayArgs: Array[String], seqNum: Int): String = {
val platformString = "+bilibili"
val tableHogwartsAccount = "dm_account.hogwarts_account"
val tableCrawlerMedia = "dwd_crawler_snapshot.snapshot_crawler_media_info"
val platformFilter = platformFilterSQLParms("platform_type", platformString.substring(0, 1), platformString.substring(1))
val startTime = UtilDate.convertToString(new Date(), "yyyy-MM-dd HH:mm:ss")
val getIdOrPlatform = udf(CodeTransform.getIdOrPlatform _)
val hogwartsAccountSQL =
s"""
|select
|identify_id,weibo_type
|from ${tableHogwartsAccount}
|where identify_id is not null AND weibo_type is not null
|""".stripMargin
refreshTable(sparkSession, tableHogwartsAccount)
val hogwartsAccountDF = sparkSession.sql(hogwartsAccountSQL)
.withColumn("platform_type", getIdOrPlatform(col("weibo_type"), lit("platform")))
.filter(s" platform_type is not null AND ${platformFilter}")
.select("identify_id", "platform_type")
.persist(StorageLevel.DISK_ONLY)
val hc = hogwartsAccountDF.count()
println("hogwartsAccountDF.count:" + hc)
val dsOfCrawlerMediaMax = Common.getMaxFieldPartitionTable(sparkSession, tableCrawlerMedia, "ds")
println("dsOfCrawlerMediaMax=" + dsOfCrawlerMediaMax)
var arr: Array[String] = Array()
val basePath = s"/user/hive/apache_warehouse/dwd_crawler_snapshot.db/snapshot_crawler_media_info/"
val listmy = platformString.substring(1).split(",").filter(_.trim.length > 0).toList
val listall = Common.getListFieldPartitionTable(sparkSession, tableCrawlerMedia, "site")
val list: List[String] = platformString.substring(0, 1) match {
case "+" => listall intersect listmy //求交集
case "-" => listall diff (listmy) //求差集
case _ => {
println("error"); Nil
}
}
list.foreach(site => {
val ss = basePath + s"site=$site/ds=$dsOfCrawlerMediaMax"
println("HDFS :" + ss + "/nda=01/")
arr = arr :+ (ss + "/nda=01")
})
val raw_media_all_data_ =
sparkSession
.read
.option("basePath", basePath)
.parquet(arr: _*)//分区超多,不想开sparkjob遍历,直接指定文件,读数据
.select("identify_id", "platform_type", "media_id", "media_created_time", "media_status", "media_play_num", "media_like_num", "media_comment_num", "media_repost_num", "media_barrage_num", "media_contribute_num", "media_collect_num", "fetched_time")
.filter("fetched_time != -1 AND fetched_time IS NOT NULL AND identify_id != 'None' AND identify_id IS NOT NULL AND media_id != 'None' AND media_id IS NOT NULL AND platform_type IS NOT NULL AND media_created_time <= fetched_time")
.withColumn("media_created_at_fixed", col("media_created_time"))
val inputDF = raw_media_all_data_.join(hogwartsAccountDF, Seq("identify_id", "platform_type"), "leftsemi")
.persist(StorageLevel.DISK_ONLY) //In语法优化,leftsemi
val ic = inputDF.count()
println("==================input====================")
println("StartTime:" + startTime)
println("hogwartsAccountDF.count:" + hc)
println("inputDF.count:" + ic)
println("StopTime:" + UtilDate.convertToString(new Date(), "yyyy-MM-dd HH:mm:ss"))
println("===========================================")
val dd = inputDF.select("identify_id", "platform_type", "media_id", "media_play_num", "media_like_num", "media_comment_num", "media_repost_num", "media_barrage_num", "media_contribute_num", "media_collect_num")
.rdd.map(row => ((row.getAs[String]("identify_id"), row.getAs[String]("platform_type")),
Array(
row.getAs[java.lang.Integer]("media_play_num"), row.getAs[java.lang.Integer]("media_like_num"), row.getAs[java.lang.Integer]("media_comment_num"), row.getAs[java.lang.Integer]("media_repost_num"), row.getAs[java.lang.Integer]("media_barrage_num"), row.getAs[java.lang.Integer]("media_contribute_num"), row.getAs[java.lang.Integer]("media_collect_num")
)))
//aggregateByKey中用StatCounter类对于多列的处理
.aggregateByKey(Array(new StatCounter(), new StatCounter(), new StatCounter(), new StatCounter(), new StatCounter(), new StatCounter(), new StatCounter()))(
{ (s, v) =>
Array(
if (null ne (v(0))) (s(0) merge v(0).toFloat) else s(0),
if (null ne (v(1))) (s(1) merge v(1).toFloat) else s(1),
if (null ne (v(2))) (s(2) merge v(2).toFloat) else s(2),
if (null ne (v(3))) (s(3) merge v(3).toFloat) else s(3),
if (null ne (v(4))) (s(4) merge v(4).toFloat) else s(4),
if (null ne (v(5))) (s(5) merge v(5).toFloat) else s(5),
if (null ne (v(6))) (s(6) merge v(6).toFloat) else s(6)
)
},
{ (s, t) =>
Array(
s(0) merge t(0), s(1) merge t(1), s(2) merge t(2), s(3) merge t(3), s(4) merge t(4), s(5) merge t(5), s(6) merge t(6)
)
}
).map(f => (f._1._2, Array(
f._2(0).stdev,
f._2(1).stdev,
f._2(2).stdev,
f._2(3).stdev,
f._2(4).stdev,
f._2(5).stdev,
f._2(6).stdev
)))
println(dd.sample(true, 0.01, 5).collect().foreach(s => println(s._1 + "_" + s._2.mkString("/"))))//抽样数据
//视频属性在账号下方差,这种方差作为账号属性在平台下的方差
val ddddd = dd.aggregateByKey(Array(new StatCounter(), new StatCounter(), new StatCounter(), new StatCounter(), new StatCounter(), new StatCounter(), new StatCounter(), new StatCounter()))(
(s, v) => Array(
if (v(0) == v(0)) (s(0) merge v(0)) else s(0),
// if (v(0) == v(0)),判断是否统计值是NaN
if (v(1) == v(1)) (s(1) merge v(1)) else s(1),
if (v(2) == v(2)) (s(2) merge v(2)) else s(2),
if (v(3) == v(3)) (s(3) merge v(3)) else s(3),
if (v(4) == v(4)) (s(4) merge v(4)) else s(4),
if (v(5) == v(5)) (s(5) merge v(5)) else s(5),
if (v(6) == v(6)) (s(6) merge v(6)) else s(6),
(s(7) merge v(0))),
(s, t) => Array(s(0) merge t(0), s(1) merge t(1), s(2) merge t(2), s(3) merge t(3), s(4) merge t(4), s(5) merge t(5), s(6) merge t(6), s(7) merge t(7))
).map(f => (f._1,
f._2(0).stdev + "_" + f._2(0).count,
f._2(1).stdev + "_" + f._2(1).count,
f._2(2).stdev + "_" + f._2(2).count,
f._2(3).stdev + "_" + f._2(3).count,
f._2(4).stdev + "_" + f._2(4).count,
f._2(5).stdev + "_" + f._2(5).count,
f._2(6).stdev + "_" + f._2(6).count,
f._2(7).stdev + "_" + f._2(7).count
)).foreach(println(_));
"done"
}
}
输出:
com.wby.freamwork.FreamworkApplication --run=incre.fansbe.test --rxun=other.rmlogtrash --rudn=order.base.calc
_ooOoo_
o8888888o
88" . "88$
(| -_- |)
O\ = /O
____/`---'\____
.' \\| |// `.
/ \\||| : |||// \
/ _||||| -:- |||||- \
| | \\\ - /// | |
| \_| ''\---/'' | |
\ .-\__ `-` ___/-. /
___`. .' /--.--\ `. . __
."" '< `.___\_<|>_/___.' >'"".
| | : `- \`.;`\ _ /`;.`/ - ` : | |
\ \ `-. \_ __\ /__ _/ .-` / /
======`-.____`-.___\_____/___.-`____.-'======
`=---='
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Application-pre,args is [ --run=incre.fansbe.test --rxun=other.rmlogtrash --rudn=order.base.calc]
The executorNum is 1
The executorPoolNum is 1
The taskInfo is clazz=com.wby.fans.incre.Test,flagRunable=true,executorPoolNum=1,executorNum=1,listArgs=[{all}]
---- Start MonitorThread & WorkerThread ----
workflow_time_out_senconds is not set
[Fri Mar 15 12:16:01 CST 2019] [0/1] Active: 1, maxPoolSize: 1, deltaSeconds: 0 /NaN
[INFO ] 2019-03-15 12:16:05,754 method:org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:376)
Trying to connect to metastore with URI thrift://192.168.1.191:9083
[INFO ] 2019-03-15 12:16:06,023 method:org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:472)
Connected to metastore.
[Fri Mar 15 12:16:34 CST 2019] [0/1] Active: 1, maxPoolSize: 1, deltaSeconds: 33 /NaN
hogwartsAccountDF.count:2348
dsOfCrawlerMediaMax=20190313
HDFS :/user/hive/apache_warehouse/dwd_crawler_snapshot.db/snapshot_crawler_media_info/site=bilibili/ds=20190313/nda=01/
[Fri Mar 15 12:17:07 CST 2019] [0/1] Active: 1, maxPoolSize: 1, deltaSeconds: 66 /NaN
==================input====================
StartTime:2019-03-15 12:16:01
hogwartsAccountDF.count:2348
inputDF.count:41553
StopTime:2019-03-15 12:17:14
===========================================
bilibili_378601.02199897915/4285.986486080111/414.45942311255897/248.56220386840633/1159.0142047307604/1417.209559564444/NaN
bilibili_3810.5606178385647/442.0628411838701/56.03768573221259/36.80881536926839/79.46627516682086/112.12592127702776/NaN
bilibili_18351.457611710302/45.653043710140516/141.12028344642738/7.823784250604052/65.03234272267915/12.843535338838759/NaN
bilibili_32901.02420001119/1033.7826140226753/453.47333737313966/211.63253907946722/479.7176831799674/1756.9835256413194/NaN
bilibili_13724.590739967143/599.1648269015117/540.4330840791919/30.433601193954246/74.25219023774343/217.71456692895225/NaN
bilibili_1315.8782873396144/35.960916284210555/5.15511931016546/3.1396087108337016/4.288094577086753/6.094075409940446/NaN
bilibili_0.0/0.0/0.0/0.0/0.0/0.0/NaN
bilibili_2056.6784237913544/9.736814445985686/3.815174380753199/1.8027756377319946/15.57776192739723/6.4031242374328485/NaN
bilibili_208757.98136880432/6407.427353803934/1644.1743478887179/3597.1341203388497/1923.1467089211644/21288.698217745692/NaN
bilibili_13884.183800641651/423.3731687294319/97.94901735086474/73.1495044412469/358.81911877713543/784.611725632494/NaN
bilibili_249551.15885573442/15716.827498258037/1332.8870207185605/5462.119423813433/4203.1907177762/29217.337220219095/NaN
bilibili_114.72763422733715/9.438274467864767/2.684339522184202/0.6765768490622338/1.3837701921194636/3.5513642963868084/NaN
bilibili_17048.749012431643/357.91650174617854/236.32510563957345/83.94045508573325/361.3404058348428/534.8725704938203/NaN
bilibili_2972.5/408.5/23.5/22.0/109.5/55.5/NaN
bilibili_6292.57055758791/151.76809795064165/28.61235164516658/5.792715732327589/48.34827355299831/169.9182810124391/NaN
bilibili_0.0/0.0/0.0/0.0/0.0/0.0/NaN
bilibili_409398.4774805116/28892.600177310404/3227.517246343354/3855.3175887414477/5468.930522430468/83465.30028144034/NaN
bilibili_2885.3832483744686/186.64458202691017/31.105626500683123/7.389181280764467/85.50999941527307/299.08854876106506/NaN
bilibili_88744.02139221675/2247.9429668526336/548.539377701263/1513.6657872712706/1644.7311756028705/8507.600810778298/NaN
bilibili_36542.06651830971/232.12518614061753/134.51532421616352/183.84171319426957/109.64581196576846/457.04660258968727/NaN
bilibili_70952.8778006044/5593.159271725496/637.3262017993611/454.0202745717665/4421.505050287992/9858.223727201344/NaN
()
(bilibili,78109.17175655576_1968,4217.539512235822_1968,1085.8793992206508_1968,1548.220828600274_1968,1868.2973588227294_1968,8784.85809035205_1968,NaN_0,NaN_1971)
listFinished:done
stop
---- Close WorkerThread ----
---- Close MonitorThread ----
=============================
[0:done]
=============================
The taskInfo is clazz=com.wby.fans.incre.Test,flagRunable=true,executorPoolNum=1,executorNum=1,listArgs=[{all}]
Application-finished,args is [ --run=incre.fansbe.test --rxun=other.rmlogtrash --rudn=order.base.calc]
Process finished with exit code 0