DataFrame:通过SparkSql将scala类转为DataFrame的方法
程序员文章站
2022-04-19 13:25:39
如下所示:
import java.text.decimalformat
import com.alibaba.fastjson.json
import co...
如下所示:
import java.text.decimalformat import com.alibaba.fastjson.json import com.donews.data.appconfig import com.typesafe.config.configfactory import org.apache.spark.sql.types.{structfield, structtype} import org.apache.spark.sql.{row, savemode, dataframe, sqlcontext} import org.apache.spark.{sparkconf, sparkcontext} import org.slf4j.loggerfactory /** * created by silentwolf on 2016/6/3. */ case class usertag(suuid: string, man: float, woman: float, age10_19: float, age20_29: float, age30_39: float, age40_49: float, age50_59: float, game: float, movie: float, music: float, art: float, politics_news: float, financial: float, education_training: float, health_care: float, travel: float, automobile: float, house_property: float, clothing_accessories: float, beauty: float, it: float, baby_product: float, food_service: float, home_furnishing: float, sports: float, outdoor_activities: float, medicine: float ) object usertagtable { val log = loggerfactory.getlogger(useroverviewfirst.getclass) val rep_home = s"${appconfig.hdfs_master}/${appconfig.hdfs_rep}" def main(args: array[string]) { var starttime = system.currenttimemillis() val conf: com.typesafe.config.config = configfactory.load() val sc = new sparkcontext() val sqlcontext = new sqlcontext(sc) var df1: dataframe = null if (args.length == 0) { println("请输入: appkey , starttime : 2016-04-10 ,startend :2016-04-11") } else { var appkey = args(0) var lastdate = args(1) df1 = loaddataframe(sqlcontext, appkey, "2016-04-10", lastdate) df1.registertemptable("suuidtable") sqlcontext.udf.register("taginfo", (a: string) => usertaginfo(a)) sqlcontext.udf.register("inttostring", (b: long) => inttostring(b)) import sqlcontext.implicits._ //***重点***:将临时表中的suuid和自定函数中json数据,放入usertag中。 sqlcontext.sql(" select distinct(suuid) as suuid,taginfo(suuid) from suuidtable group by suuid").map { case row(suuid: string, taginfo: string) => val taginfoobj = json.parseobject(taginfo) usertag(suuid.tostring, taginfoobj.getfloat("man"), taginfoobj.getfloat("woman"), taginfoobj.getfloat("age10_19"), taginfoobj.getfloat("age20_29"), taginfoobj.getfloat("age30_39"), taginfoobj.getfloat("age40_49"), taginfoobj.getfloat("age50_59"), taginfoobj.getfloat("game"), taginfoobj.getfloat("movie"), taginfoobj.getfloat("music"), taginfoobj.getfloat("art"), taginfoobj.getfloat("politics_news"), taginfoobj.getfloat("financial"), taginfoobj.getfloat("education_training"), taginfoobj.getfloat("health_care"), taginfoobj.getfloat("travel"), taginfoobj.getfloat("automobile"), taginfoobj.getfloat("house_property"), taginfoobj.getfloat("clothing_accessories"), taginfoobj.getfloat("beauty"), taginfoobj.getfloat("it"), taginfoobj.getfloat("baby_product"), taginfoobj.getfloat("food_service"), taginfoobj.getfloat("home_furnishing"), taginfoobj.getfloat("sports"), taginfoobj.getfloat("outdoor_activities"), taginfoobj.getfloat("medicine") )}.todf().registertemptable("resulttable") val resultdf = sqlcontext.sql(s"select '$appkey' as appkey, '$lastdate' as date,suuid ,man,woman,age10_19,age20_29,age30_39 ," + "age40_49 ,age50_59,game,movie,music,art,politics_news,financial,education_training,health_care,travel,automobile," + "house_property,clothing_accessories,beauty,it,baby_product ,food_service ,home_furnishing ,sports ,outdoor_activities ," + "medicine from resulttable where suuid is not null") resultdf.write.mode(savemode.overwrite).options( map("table" -> "user_tags", "zkurl" -> conf.getstring("hbase.url")) ).format("org.apache.phoenix.spark").save() } } def inttostring(suuid: long): string = { suuid.tostring() } def usertaginfo(num1: string): string = { var de = new decimalformat("0.00") var mannum = de.format(math.random).tofloat var man = mannum var woman = de.format(1 - mannum).tofloat var age10_19num = de.format(math.random * 0.2).tofloat var age20_29num = de.format(math.random * 0.2).tofloat var age30_39num = de.format(math.random * 0.2).tofloat var age40_49num = de.format(math.random * 0.2).tofloat var age10_19 = age10_19num var age20_29 = age20_29num var age30_39 = age30_39num var age40_49 = age40_49num var age50_59 = de.format(1 - age10_19num - age20_29num - age30_39num - age40_49num).tofloat var game = de.format(math.random * 1).tofloat var movie = de.format(math.random * 1).tofloat var music = de.format(math.random * 1).tofloat var art = de.format(math.random * 1).tofloat var politics_news = de.format(math.random * 1).tofloat var financial = de.format(math.random * 1).tofloat var education_training = de.format(math.random * 1).tofloat var health_care = de.format(math.random * 1).tofloat var travel = de.format(math.random * 1).tofloat var automobile = de.format(math.random * 1).tofloat var house_property = de.format(math.random * 1).tofloat var clothing_accessories = de.format(math.random * 1).tofloat var beauty = de.format(math.random * 1).tofloat var it = de.format(math.random * 1).tofloat var baby_product = de.format(math.random * 1).tofloat var food_service = de.format(math.random * 1).tofloat var home_furnishing = de.format(math.random * 1).tofloat var sports = de.format(math.random * 1).tofloat var outdoor_activities = de.format(math.random * 1).tofloat var medicine = de.format(math.random * 1).tofloat "{" + "\"man\"" + ":" + man + "," + "\"woman\"" + ":" + woman + "," + "\"age10_19\"" + ":" + age10_19 + "," + "\"age20_29\"" + ":" + age20_29 + "," + "\"age30_39\"" + ":" + age30_39 + "," + "\"age40_49\"" + ":" + age40_49 + "," + "\"age50_59\"" + ":" + age50_59 + "," + "\"game\"" + ":" + game + "," + "\"movie\"" + ":" + movie + "," + "\"music\"" + ":" + music + "," + "\"art\"" + ":" + art + "," + "\"politics_news\"" + ":" + politics_news + "," + "\"financial\"" + ":" + financial + "," + "\"education_training\"" + ":" + education_training + "," + "\"health_care\"" + ":" + health_care + "," + "\"travel\"" + ":" + travel + "," + "\"automobile\"" + ":" + automobile + "," + "\"house_property\"" + ":" + house_property + "," + "\"clothing_accessories\"" + ":" + clothing_accessories + "," + "\"beauty\"" + ":" + beauty + "," + "\"it\"" + ":" + it + "," + "\"baby_product\"" + ":" + baby_product + "," + "\"food_service\"" + ":" + food_service + "," + "\"home_furnishing\"" + ":" + home_furnishing + "," + "\"sports\"" + ":" + sports + "," + "\"outdoor_activities\"" + ":" + outdoor_activities + "," + "\"medicine\"" + ":" + medicine + "}"; } def loaddataframe(ctx: sqlcontext, appkey: string, startday: string, endday: string): dataframe = { val path = s"$rep_home/appstatistic" ctx.read.parquet(path) .filter(s"timestamp is not null and appkey='$appkey' and day>='$startday' and day<='$endday'") } }
以上这篇dataframe:通过sparksql将scala类转为dataframe的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。
上一篇: 你能把我咋的啊