Java和scala实现 Spark RDD转换成DataFrame的两种方法小结
程序员文章站
2023-12-17 16:59:04
一:准备数据源
在项目下新建一个student.txt文件,里面的内容为:
1,zhangsan,20
2,lisi,21
3,wanger,19...
一:准备数据源
在项目下新建一个student.txt文件,里面的内容为:
1,zhangsan,20 2,lisi,21 3,wanger,19 4,fangliu,18
二:实现
java版:
1.首先新建一个student的bean对象,实现序列化和tostring()方法,具体代码如下:
package com.cxd.sql; import java.io.serializable; @suppresswarnings("serial") public class student implements serializable { string sid; string sname; int sage; public string getsid() { return sid; } public void setsid(string sid) { this.sid = sid; } public string getsname() { return sname; } public void setsname(string sname) { this.sname = sname; } public int getsage() { return sage; } public void setsage(int sage) { this.sage = sage; } @override public string tostring() { return "student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]"; } }
2.转换,具体代码如下
package com.cxd.sql; import java.util.arraylist; import org.apache.spark.sparkconf; import org.apache.spark.api.java.javardd; import org.apache.spark.sql.dataset; import org.apache.spark.sql.row; import org.apache.spark.sql.rowfactory; import org.apache.spark.sql.savemode; import org.apache.spark.sql.sparksession; import org.apache.spark.sql.types.datatypes; import org.apache.spark.sql.types.structfield; import org.apache.spark.sql.types.structtype; public class txttoparquetdemo { public static void main(string[] args) { sparkconf conf = new sparkconf().setappname("txttoparquet").setmaster("local"); sparksession spark = sparksession.builder().config(conf).getorcreate(); reflecttransform(spark);//java反射 dynamictransform(spark);//动态转换 } /** * 通过java反射转换 * @param spark */ private static void reflecttransform(sparksession spark) { javardd<string> source = spark.read().textfile("stuinfo.txt").javardd(); javardd<student> rowrdd = source.map(line -> { string parts[] = line.split(","); student stu = new student(); stu.setsid(parts[0]); stu.setsname(parts[1]); stu.setsage(integer.valueof(parts[2])); return stu; }); dataset<row> df = spark.createdataframe(rowrdd, student.class); df.select("sid", "sname", "sage"). coalesce(1).write().mode(savemode.append).parquet("parquet.res"); } /** * 动态转换 * @param spark */ private static void dynamictransform(sparksession spark) { javardd<string> source = spark.read().textfile("stuinfo.txt").javardd(); javardd<row> rowrdd = source.map( line -> { string[] parts = line.split(","); string sid = parts[0]; string sname = parts[1]; int sage = integer.parseint(parts[2]); return rowfactory.create( sid, sname, sage ); }); arraylist<structfield> fields = new arraylist<structfield>(); structfield field = null; field = datatypes.createstructfield("sid", datatypes.stringtype, true); fields.add(field); field = datatypes.createstructfield("sname", datatypes.stringtype, true); fields.add(field); field = datatypes.createstructfield("sage", datatypes.integertype, true); fields.add(field); structtype schema = datatypes.createstructtype(fields); dataset<row> df = spark.createdataframe(rowrdd, schema); df.coalesce(1).write().mode(savemode.append).parquet("parquet.res1"); } }
scala版本:
import org.apache.spark.sql.sparksession import org.apache.spark.sql.types.stringtype import org.apache.spark.sql.types.structfield import org.apache.spark.sql.types.structtype import org.apache.spark.sql.row import org.apache.spark.sql.types.integertype object rdd2dataset { case class student(id:int,name:string,age:int) def main(args:array[string]) { val spark=sparksession.builder().master("local").appname("rdd2dataset").getorcreate() import spark.implicits._ reflectcreate(spark) dynamiccreate(spark) } /** * 通过java反射转换 * @param spark */ private def reflectcreate(spark:sparksession):unit={ import spark.implicits._ val sturdd=spark.sparkcontext.textfile("student2.txt") //todf()为隐式转换 val studf=sturdd.map(_.split(",")).map(parts⇒student(parts(0).trim.toint,parts(1),parts(2).trim.toint)).todf() //studf.select("id","name","age").write.text("result") //对写入文件指定列名 studf.printschema() studf.createorreplacetempview("student") val namedf=spark.sql("select name from student where age<20") //namedf.write.text("result") //将查询结果写入一个文件 namedf.show() } /** * 动态转换 * @param spark */ private def dynamiccreate(spark:sparksession):unit={ val sturdd=spark.sparkcontext.textfile("student.txt") import spark.implicits._ val schemastring="id,name,age" val fields=schemastring.split(",").map(fieldname => structfield(fieldname, stringtype, nullable = true)) val schema=structtype(fields) val rowrdd=sturdd.map(_.split(",")).map(parts⇒row(parts(0),parts(1),parts(2))) val studf=spark.createdataframe(rowrdd, schema) studf.printschema() val tmpview=studf.createorreplacetempview("student") val namedf=spark.sql("select name from student where age<20") //namedf.write.text("result") //将查询结果写入一个文件 namedf.show() } }
注:
1.上面代码全都已经测试通过,测试的环境为spark2.1.0,jdk1.8。
2.此代码不适用于spark2.0以前的版本。
以上这篇java和scala实现 spark rdd转换成dataframe的两种方法小结就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。