MapPartition和Map的区别
前言:要学习spark程序开发,建议先学习spark-shell交互式学习,加深对spark程序开发的理解。spark-shell提供了一种学习api的简单方式,以及一个能够进行交互式分析数据的强大工具,可以使用scala编写(scala运行与java虚拟机可以使用现有的java库)或使用python编写。
1.启动spark-shell
spark-shell的本质是在后台调用了spark-submit脚本来启动应用程序的,在spark-shell中已经创建了一个名为sc的sparkcontext对象,在4个cpu核运行spark-shell命令如下:
spark-shell --master local[4]
如果指定jar包路径,则命令如下:
spark-shell --master local[4] --jars xxx.jar,yyy,jar
--master用来设置context将要连接并使用的资源主节点,master的值是standalone模式中spark的集群地址、yarn或mesos集群的url,或是一个local地址
--jars可以添加需要用到的jar包,通过逗号分隔来添加多个包。
2.加载text文件
spark创建sc后,可以加载本地文件创建rdd,这里测试是加载spark自带的本地文件readme.md,返回一个mappartitionsrdd文件。
scala> val textfile = sc.textfile("file:///opt/cloud/spark-2.1.1-bin-hadoop2.7/readme.md");
textfile: org.apache.spark.rdd.rdd[string] = file:///opt/cloud/spark-2.1.1-bin-hadoop2.7/readme.md mappartitionsrdd[9] at textfile at <console>:24
加载hdfs文件和本地文件都是使用textfile,区别是添加前缀(hdfs://和file://)进行标识,从本地读取文件直接返回mappartitionsrdd,而从hdfs读取的文件是先转成hadooprdd,然后隐试转换成mappartitionsrdd。想了解mappartitions可以看这篇mappartition和map的区别。
3.简单rdd操作
对于rdd可以执行transformation返回新的rdd,也可以执行action得到返回结果。first命令返回文件第一行,count命令返回文件所有行数。
scala> textfile.first(); res6: string = # apache spark scala> textfile.count(); res7: long = 104
接下来进行transformation操作,使用filter命令从readme.md文件中抽取出一个子集,返回一个新的filteredrdd。
scala> val textfilter = textfile.filter(line=>line.contains("spark")); textfilter: org.apache.spark.rdd.rdd[string] = mappartitionsrdd[16] at filter at <console>:26
链接多个transformation和action,计算包括"spark"字符串的行数。
scala> textfile.filter(line=>line.contains("spark")).count(); res10: long = 20
4.rdd应用的简单操作
(1)计算文本中单词最多的一行的单词数
scala> textfile.map(line =>line.split(" ").size).reduce((a,b) => if (a > b) a else b); res11: int = 22
先将每一行的单词使用空格进行拆分,并统计每一行的单词数,创建一个基于单词数的新rdd,然后对该rdd进行reduce操作返回最大值。
(2)统计单词
词频统计wordcount是大数据处理最流行的入门程序之一,spark可以很容易实现wordcount操作。
//这个过程返回的是一个(string,int)类型的键值对shuffledrdd(y执行reducebykey的时候需要进行shuffle操作,返回的是一个shuffle形式的rdd),最后用collect聚合统计结果
scala> val wordcount = textfile.flatmap(line =>line.split(" ")).map(x => (x,1)).reducebykey((a,b) => a+b); wordcount: org.apache.spark.rdd.rdd[(string, int)] = shuffledrdd[23] at reducebykey at <console>:26 scala> wordcount.collect [stage 7:> (0 + 0)
[stage 7:> (0 + 2)
res12: array[(string, int)] = array((package,1), (this,1), (version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (because,1), (python,2), (page](http://spark.apache.org/documentation.html).,1), (cluster.,1), (its,1), ([run,1), (general,3), (have,1), (pre-built,1), (yarn,,1), ([http://spark.apache.org/developer-tools.html](the,1), (changed,1), (locally,2), (sc.parallelize(1,1), (only,1), (locally.,1), (several,1), (this,2), (basic,1), (configuration,1), (learning,,1), (documentation,3), (first,1), (graph,1), (hive,2), (info,1), (["specifying,1), ("yarn",1), ([params]`.,1), ([project,1), (prefer,1), (sparkpi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (master,1), (example,3), (["parallel,1), (ar...
//这里使用了占位符_,使表达式更为简洁,是scala语音的特色,每个_代表一个参数。
scala> val wordcount2 = textfile.flatmap(_.split(" ")).map((_,1)).reducebykey(_+_); wordcount2: org.apache.spark.rdd.rdd[(string, int)] = shuffledrdd[26] at reducebykey at <console>:26 scala> wordcount2.collect res14: array[(string, int)] = array((package,1), (this,1), (version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (because,1), (python,2), (page](http://spark.apache.org/documentation.html).,1), (cluster.,1), (its,1), ([run,1), (general,3), (have,1), (pre-built,1), (yarn,,1), ([http://spark.apache.org/developer-tools.html](the,1), (changed,1), (locally,2), (sc.parallelize(1,1), (only,1), (locally.,1), (several,1), (this,2), (basic,1), (configuration,1), (learning,,1), (documentation,3), (first,1), (graph,1), (hive,2), (info,1), (["specifying,1), ("yarn",1), ([params]`.,1), ([project,1), (prefer,1), (sparkpi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (master,1), (example,3), (["parallel,1), (ar...
//spark默认不进行排序,如有需要排序输出,排序的时候将key和value互换,使用sortbykey方法指定升序(true)和降序(false)
scala> val wordcount3 = textfile.flatmap(_.split(" ")).map((_,1)).reducebykey(_+_).map(x=>(x._2,x._1)).sortbykey(false).map(x=>(x._2,x._1)); wordcount3: org.apache.spark.rdd.rdd[(string, int)] = mappartitionsrdd[34] at map at <console>:26 scala> wordcount3.collect res15: array[(string, int)] = array(("",71), (the,24), (to,17), (spark,16), (for,12), (##,9), (and,9), (a,8), (can,7), (run,7), (on,7), (is,6), (in,6), (using,5), (of,5), (build,4), (please,4), (with,4), (also,4), (if,4), (including,4), (an,4), (you,4), (you,4), (general,3), (documentation,3), (example,3), (how,3), (one,3), (for,3), (use,3), (or,3), (see,3), (hadoop,3), (python,2), (locally,2), (this,2), (hive,2), (sparkpi,2), (refer,2), (interactive,2), (scala,2), (detailed,2), (return,2), (shell,2), (class,2), (python,,2), (set,2), (building,2), (sql,2), (guidance,2), (cluster,2), (shell:,2), (supports,2), (particular,2), (following,2), (which,2), (should,2), (to,2), (be,2), (do,2), (./bin/run-example,2), (it,2), (1000:,2), (tests,2), (examples,2), (at,2), (`examples`,2), (that,2), (h...
5.rdd缓存使用rdd的cache()方法
推荐阅读
-
SQLserver中字符串查找功能patindex和charindex的区别
-
js parentElement和offsetParent之间的区别_javascript技巧
-
php include和require的区别深入解析
-
CSS的link和@import的区别_html/css_WEB-ITnose
-
windows 2003 server php中$_SERVER[PHP_SELF] 和 $_SERVER[SCRIPT_NAME]之间的区别
-
indexOf和lastIndexOf的区别
-
PHP中单引号和双引号的区别,php引号双引号_PHP教程
-
JavaScript操作DOM元素的childNodes和children区别_javascript技巧
-
request(域对象)和response的区别和用法
-
详细分析mybatis中resultType和resultMap的区别与联系