SparkSQL简单教程
当面对一堆格式化的数据需要做一些统计分析的时候,awk是个非常不错的选择。但是当数据量上来以后,通过单机awk的方式处理就显得有些力不从心,这个时候我们可以通过SparkSQL来模拟sql的方式来处理这些海量数据,现在就给大家举个实例,看看怎么通过简单的几行代码用SparkSQL的方式来分析海量数据。
1.原始数据
在hdfs上有个路径为XXX,数据规模大概为100G左右,都是格式化的标准数据,每一行四个字段,其中某一行格式为u1 20170916 20171203 OPPO R11
。现在我们想针对第四个字段做group by的操作。如果数据量比较小,例如1G左右的规模,用awk来处理是很方便的。但是100G的数据规模,用awk显然就不合适了。现在我们试着用SparkSQL来搞定这个需求。
2.在spark-shell中使用SparkSQL
2.1 先启动spark-shell,没啥好说的。
2.2 启动spark-shell以后,默认有SparkContext的入口,即sc变量,但是没有SparkSQL的入口,这个时候需要我们新建一个SparkSql的入口。
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@407e10a9
2.3 同时要将sqlContext的隐变量导入,因为在Scala中使用反射方式,进行RDD到DataFrame的转换,需要手动导入一个隐式转换:
scala> import sqlContext.implicits._
import sqlContext.implicits._
2.4 然后针对输入生成RDD:
scala> val rdd = sc.textFile("XXX").filter(x=> x.split("\t").length==4).map(x => x.split("\t")(3))
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at map at <console>:29
2.5 为了方便后面使用,定义一个case class:
scala> case class Model(model: String)
defined class Model
2.6 接下来是关键的一步:将我们将前面生成的rdd转化为DataFrame:
scala> val df = rdd.map(arr => Model(arr)).toDF()
df: org.apache.spark.sql.DataFrame = [model: string]
2.7 注册一个临时表:
scala> df.registerTempTable("tmp")
warning: there was one deprecation warning; re-run with -deprecation for details
2.8 接下来就可以使用类似的Sql语句来分析了:
scala> val res = sqlContext.sql("select upper(model), count(*) from tmp group by upper(model)").collect()
res: Array[org.apache.spark.sql.Row] = Array([OPPO R11,1xxx], [OPPO A57,1xxxxxxx], [VIVO Y66,1xxxxxxxx], [VIVO X9,1xxxx])
通过上面简单的几行代码,就能在spark-shell上用SparkSql分析海量数据了!
上一篇: 海量数据处理
下一篇: 哈希扩展——布隆过滤器