欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

一个有意思的spark代码,今天闲的蛋疼了

程序员文章站 2024-03-12 20:36:08
...
package com.wby.fans.incre

import java.util.Date
import com.wby.annotation.Workflow
import com.wby.data.common.Common.{platformFilterSQLParms, refreshTable}
import com.wby.data.common.{CodeTransform, Common}
import com.wby.spark.WorkflowTrait
import com.wby.utils.UtilDate
import org.apache.spark.sql.functions.{col, lit, udf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.StatCounter

@Workflow("incre.fansbe.test")
class Test extends WorkflowTrait {
  override def formatArgsListSingleton(args: Seq[String]): List[Array[String]] = {
    var list: List[Array[String]] = Nil;
    list :+= Array("all");
    list
  }
  override def processPrototype(arrayArgs: Array[String], seqNum: Int): String = {
    val platformString = "+bilibili"
    val tableHogwartsAccount = "dm_account.hogwarts_account"
    val tableCrawlerMedia = "dwd_crawler_snapshot.snapshot_crawler_media_info"
    val platformFilter = platformFilterSQLParms("platform_type", platformString.substring(0, 1), platformString.substring(1))
    val startTime = UtilDate.convertToString(new Date(), "yyyy-MM-dd HH:mm:ss")
    val getIdOrPlatform = udf(CodeTransform.getIdOrPlatform _)
    val hogwartsAccountSQL =
      s"""
         |select
         |identify_id,weibo_type
         |from ${tableHogwartsAccount}
         |where identify_id is not null AND weibo_type is not null
         |""".stripMargin
    refreshTable(sparkSession, tableHogwartsAccount)
    val hogwartsAccountDF = sparkSession.sql(hogwartsAccountSQL)
      .withColumn("platform_type", getIdOrPlatform(col("weibo_type"), lit("platform")))
      .filter(s" platform_type is not null AND ${platformFilter}")
      .select("identify_id", "platform_type")
      .persist(StorageLevel.DISK_ONLY)
    val hc = hogwartsAccountDF.count()
    println("hogwartsAccountDF.count:" + hc)
    val dsOfCrawlerMediaMax = Common.getMaxFieldPartitionTable(sparkSession, tableCrawlerMedia, "ds")
    println("dsOfCrawlerMediaMax=" + dsOfCrawlerMediaMax)
    var arr: Array[String] = Array()
    val basePath = s"/user/hive/apache_warehouse/dwd_crawler_snapshot.db/snapshot_crawler_media_info/"
    val listmy = platformString.substring(1).split(",").filter(_.trim.length > 0).toList
    val listall = Common.getListFieldPartitionTable(sparkSession, tableCrawlerMedia, "site")
    val list: List[String] = platformString.substring(0, 1) match {
      case "+" => listall intersect listmy //求交集
      case "-" => listall diff (listmy) //求差集
      case _ => {
        println("error"); Nil
      }
    }
    list.foreach(site => {
      val ss = basePath + s"site=$site/ds=$dsOfCrawlerMediaMax"
      println("HDFS :" + ss + "/nda=01/")
      arr = arr :+ (ss + "/nda=01")
    })
    val raw_media_all_data_ =
      sparkSession
        .read
        .option("basePath", basePath)
        .parquet(arr: _*)//分区超多,不想开sparkjob遍历,直接指定文件,读数据
        .select("identify_id", "platform_type", "media_id", "media_created_time", "media_status", "media_play_num", "media_like_num", "media_comment_num", "media_repost_num", "media_barrage_num", "media_contribute_num", "media_collect_num", "fetched_time")
        .filter("fetched_time != -1 AND fetched_time IS NOT NULL AND identify_id != 'None' AND identify_id IS NOT NULL AND media_id != 'None' AND media_id IS NOT NULL  AND platform_type IS NOT NULL  AND media_created_time <= fetched_time")
        .withColumn("media_created_at_fixed", col("media_created_time"))
    val inputDF = raw_media_all_data_.join(hogwartsAccountDF, Seq("identify_id", "platform_type"), "leftsemi")
    .persist(StorageLevel.DISK_ONLY) //In语法优化,leftsemi

    val ic = inputDF.count()
    println("==================input====================")
    println("StartTime:" + startTime)
    println("hogwartsAccountDF.count:" + hc)
    println("inputDF.count:" + ic)
    println("StopTime:" + UtilDate.convertToString(new Date(), "yyyy-MM-dd HH:mm:ss"))
    println("===========================================")
    val dd = inputDF.select("identify_id", "platform_type", "media_id", "media_play_num", "media_like_num", "media_comment_num", "media_repost_num", "media_barrage_num", "media_contribute_num", "media_collect_num")
      .rdd.map(row => ((row.getAs[String]("identify_id"), row.getAs[String]("platform_type")),
      Array(
        row.getAs[java.lang.Integer]("media_play_num"), row.getAs[java.lang.Integer]("media_like_num"), row.getAs[java.lang.Integer]("media_comment_num"), row.getAs[java.lang.Integer]("media_repost_num"), row.getAs[java.lang.Integer]("media_barrage_num"), row.getAs[java.lang.Integer]("media_contribute_num"), row.getAs[java.lang.Integer]("media_collect_num")
      )))
      //aggregateByKey中用StatCounter类对于多列的处理
      .aggregateByKey(Array(new StatCounter(), new StatCounter(), new StatCounter(), new StatCounter(), new StatCounter(), new StatCounter(), new StatCounter()))(
        { (s, v) =>
          Array(
            if (null ne (v(0))) (s(0) merge v(0).toFloat) else s(0),
            if (null ne (v(1))) (s(1) merge v(1).toFloat) else s(1),
            if (null ne (v(2))) (s(2) merge v(2).toFloat) else s(2),
            if (null ne (v(3))) (s(3) merge v(3).toFloat) else s(3),
            if (null ne (v(4))) (s(4) merge v(4).toFloat) else s(4),
            if (null ne (v(5))) (s(5) merge v(5).toFloat) else s(5),
            if (null ne (v(6))) (s(6) merge v(6).toFloat) else s(6)
          )
        },
        { (s, t) =>
          Array(
            s(0) merge t(0), s(1) merge t(1), s(2) merge t(2), s(3) merge t(3), s(4) merge t(4), s(5) merge t(5), s(6) merge t(6)
          )
        }
      ).map(f => (f._1._2, Array(
      f._2(0).stdev,
      f._2(1).stdev,
      f._2(2).stdev,
      f._2(3).stdev,
      f._2(4).stdev,
      f._2(5).stdev,
      f._2(6).stdev
    )))
    println(dd.sample(true, 0.01, 5).collect().foreach(s => println(s._1 + "_" + s._2.mkString("/"))))//抽样数据
//视频属性在账号下方差,这种方差作为账号属性在平台下的方差
    val ddddd = dd.aggregateByKey(Array(new StatCounter(), new StatCounter(), new StatCounter(), new StatCounter(), new StatCounter(), new StatCounter(), new StatCounter(), new StatCounter()))(
      (s, v) => Array(
        if (v(0) == v(0)) (s(0) merge v(0)) else s(0),
        // if (v(0) == v(0)),判断是否统计值是NaN
        if (v(1) == v(1)) (s(1) merge v(1)) else s(1),
        if (v(2) == v(2)) (s(2) merge v(2)) else s(2),
        if (v(3) == v(3)) (s(3) merge v(3)) else s(3),
        if (v(4) == v(4)) (s(4) merge v(4)) else s(4),
        if (v(5) == v(5)) (s(5) merge v(5)) else s(5),
        if (v(6) == v(6)) (s(6) merge v(6)) else s(6),
        (s(7) merge v(0))),
      (s, t) => Array(s(0) merge t(0), s(1) merge t(1), s(2) merge t(2), s(3) merge t(3), s(4) merge t(4), s(5) merge t(5), s(6) merge t(6), s(7) merge t(7))
    ).map(f => (f._1,
      f._2(0).stdev + "_" + f._2(0).count,
      f._2(1).stdev + "_" + f._2(1).count,
      f._2(2).stdev + "_" + f._2(2).count,
      f._2(3).stdev + "_" + f._2(3).count,
      f._2(4).stdev + "_" + f._2(4).count,
      f._2(5).stdev + "_" + f._2(5).count,
      f._2(6).stdev + "_" + f._2(6).count,
      f._2(7).stdev + "_" + f._2(7).count
    )).foreach(println(_));
    "done"
  }
}

输出:

com.wby.freamwork.FreamworkApplication --run=incre.fansbe.test --rxun=other.rmlogtrash --rudn=order.base.calc

                   _ooOoo_
                  o8888888o
                  88" . "88$
                  (| -_- |)
                  O\  =  /O
               ____/`---'\____
             .'  \\|     |//  `.
            /  \\|||  :  |||//  \
           /  _||||| -:- |||||-  \
           |   | \\\  -  /// |   |
           | \_|  ''\---/''  |   |
           \  .-\__  `-`  ___/-. /
         ___`. .'  /--.--\  `. . __
      ."" '<  `.___\_<|>_/___.'  >'"".
     | | :  `- \`.;`\ _ /`;.`/ - ` : | |
     \  \ `-.   \_ __\ /__ _/   .-` /  /
======`-.____`-.___\_____/___.-`____.-'======
                   `=---='
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Application-pre,args is [ --run=incre.fansbe.test --rxun=other.rmlogtrash --rudn=order.base.calc]
The executorNum is 1
The executorPoolNum is 1
The taskInfo is clazz=com.wby.fans.incre.Test,flagRunable=true,executorPoolNum=1,executorNum=1,listArgs=[{all}]
 ---- Start MonitorThread & WorkerThread ---- 
workflow_time_out_senconds is not set
[Fri Mar 15 12:16:01 CST 2019] [0/1] Active: 1, maxPoolSize: 1, deltaSeconds: 0 /NaN
[INFO ] 2019-03-15 12:16:05,754 method:org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:376)
Trying to connect to metastore with URI thrift://192.168.1.191:9083
[INFO ] 2019-03-15 12:16:06,023 method:org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:472)
Connected to metastore.
[Fri Mar 15 12:16:34 CST 2019] [0/1] Active: 1, maxPoolSize: 1, deltaSeconds: 33 /NaN
hogwartsAccountDF.count:2348
dsOfCrawlerMediaMax=20190313
HDFS :/user/hive/apache_warehouse/dwd_crawler_snapshot.db/snapshot_crawler_media_info/site=bilibili/ds=20190313/nda=01/
[Fri Mar 15 12:17:07 CST 2019] [0/1] Active: 1, maxPoolSize: 1, deltaSeconds: 66 /NaN
==================input====================
StartTime:2019-03-15 12:16:01
hogwartsAccountDF.count:2348
inputDF.count:41553
StopTime:2019-03-15 12:17:14
===========================================
bilibili_378601.02199897915/4285.986486080111/414.45942311255897/248.56220386840633/1159.0142047307604/1417.209559564444/NaN
bilibili_3810.5606178385647/442.0628411838701/56.03768573221259/36.80881536926839/79.46627516682086/112.12592127702776/NaN
bilibili_18351.457611710302/45.653043710140516/141.12028344642738/7.823784250604052/65.03234272267915/12.843535338838759/NaN
bilibili_32901.02420001119/1033.7826140226753/453.47333737313966/211.63253907946722/479.7176831799674/1756.9835256413194/NaN
bilibili_13724.590739967143/599.1648269015117/540.4330840791919/30.433601193954246/74.25219023774343/217.71456692895225/NaN
bilibili_1315.8782873396144/35.960916284210555/5.15511931016546/3.1396087108337016/4.288094577086753/6.094075409940446/NaN
bilibili_0.0/0.0/0.0/0.0/0.0/0.0/NaN
bilibili_2056.6784237913544/9.736814445985686/3.815174380753199/1.8027756377319946/15.57776192739723/6.4031242374328485/NaN
bilibili_208757.98136880432/6407.427353803934/1644.1743478887179/3597.1341203388497/1923.1467089211644/21288.698217745692/NaN
bilibili_13884.183800641651/423.3731687294319/97.94901735086474/73.1495044412469/358.81911877713543/784.611725632494/NaN
bilibili_249551.15885573442/15716.827498258037/1332.8870207185605/5462.119423813433/4203.1907177762/29217.337220219095/NaN
bilibili_114.72763422733715/9.438274467864767/2.684339522184202/0.6765768490622338/1.3837701921194636/3.5513642963868084/NaN
bilibili_17048.749012431643/357.91650174617854/236.32510563957345/83.94045508573325/361.3404058348428/534.8725704938203/NaN
bilibili_2972.5/408.5/23.5/22.0/109.5/55.5/NaN
bilibili_6292.57055758791/151.76809795064165/28.61235164516658/5.792715732327589/48.34827355299831/169.9182810124391/NaN
bilibili_0.0/0.0/0.0/0.0/0.0/0.0/NaN
bilibili_409398.4774805116/28892.600177310404/3227.517246343354/3855.3175887414477/5468.930522430468/83465.30028144034/NaN
bilibili_2885.3832483744686/186.64458202691017/31.105626500683123/7.389181280764467/85.50999941527307/299.08854876106506/NaN
bilibili_88744.02139221675/2247.9429668526336/548.539377701263/1513.6657872712706/1644.7311756028705/8507.600810778298/NaN
bilibili_36542.06651830971/232.12518614061753/134.51532421616352/183.84171319426957/109.64581196576846/457.04660258968727/NaN
bilibili_70952.8778006044/5593.159271725496/637.3262017993611/454.0202745717665/4421.505050287992/9858.223727201344/NaN
()
(bilibili,78109.17175655576_1968,4217.539512235822_1968,1085.8793992206508_1968,1548.220828600274_1968,1868.2973588227294_1968,8784.85809035205_1968,NaN_0,NaN_1971)
listFinished:done
stop
 ---- Close WorkerThread ---- 
 ---- Close MonitorThread ---- 
=============================
[0:done]
=============================
The taskInfo is clazz=com.wby.fans.incre.Test,flagRunable=true,executorPoolNum=1,executorNum=1,listArgs=[{all}]
Application-finished,args is [ --run=incre.fansbe.test --rxun=other.rmlogtrash --rudn=order.base.calc]

Process finished with exit code 0
相关标签: 蛋疼