欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SparkSQL简单教程

程序员文章站 2024-03-15 21:56:24
...

当面对一堆格式化的数据需要做一些统计分析的时候,awk是个非常不错的选择。但是当数据量上来以后,通过单机awk的方式处理就显得有些力不从心,这个时候我们可以通过SparkSQL来模拟sql的方式来处理这些海量数据,现在就给大家举个实例,看看怎么通过简单的几行代码用SparkSQL的方式来分析海量数据。

1.原始数据

在hdfs上有个路径为XXX,数据规模大概为100G左右,都是格式化的标准数据,每一行四个字段,其中某一行格式为u1 20170916 20171203 OPPO R11。现在我们想针对第四个字段做group by的操作。如果数据量比较小,例如1G左右的规模,用awk来处理是很方便的。但是100G的数据规模,用awk显然就不合适了。现在我们试着用SparkSQL来搞定这个需求。

2.在spark-shell中使用SparkSQL

2.1 先启动spark-shell,没啥好说的。
2.2 启动spark-shell以后,默认有SparkContext的入口,即sc变量,但是没有SparkSQL的入口,这个时候需要我们新建一个SparkSql的入口。

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@407e10a9

2.3 同时要将sqlContext的隐变量导入,因为在Scala中使用反射方式,进行RDD到DataFrame的转换,需要手动导入一个隐式转换:

scala> import sqlContext.implicits._
import sqlContext.implicits._

2.4 然后针对输入生成RDD:

scala> val rdd = sc.textFile("XXX").filter(x=> x.split("\t").length==4).map(x => x.split("\t")(3))
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at map at <console>:29

2.5 为了方便后面使用,定义一个case class:

scala> case class Model(model: String)
defined class Model

2.6 接下来是关键的一步:将我们将前面生成的rdd转化为DataFrame:

scala> val df = rdd.map(arr => Model(arr)).toDF()
df: org.apache.spark.sql.DataFrame = [model: string]

2.7 注册一个临时表:

scala> df.registerTempTable("tmp")
warning: there was one deprecation warning; re-run with -deprecation for details

2.8 接下来就可以使用类似的Sql语句来分析了:

scala> val res = sqlContext.sql("select upper(model), count(*) from tmp group by upper(model)").collect()
res: Array[org.apache.spark.sql.Row] = Array([OPPO R11,1xxx], [OPPO A57,1xxxxxxx], [VIVO Y66,1xxxxxxxx], [VIVO X9,1xxxx])

通过上面简单的几行代码,就能在spark-shell上用SparkSql分析海量数据了!