Spark 算子
Spark大数据计算框架
Spark(火花)出现不全是代替Map Reduce,里面有批处理(Spark运行在内存中,Map Reduce运行在硬盘中,根据自身情况选择适应的开发环境(比如:机器的数量,配置,资金等等))
了解Hadoop发展历史:2011年发布1.0版本,2012年发布稳定版,2013年发布2.X版本(里面出现了Yarn,得以重用)
2012年版本中
MR的缺点:
mr基于数据集的计算,所以面向数据
1.基本运算规则从存储介质中获取(采集)数据,然后进行计算,最后将结果存储到介质中,所以主要应用于一次性计算(单节点计算),不适合数据挖掘和机器学习这样的迭代计算和图形挖掘计算。
2.MR基于文件存储介质操作,所以性能非常慢
3.MR和Hadoop紧密耦合在一起,无法动态替换(违背了ocp原理)
4.RM中,之前的Job任务太重了,导致数据运算太慢。(既要做资源调度任务,又要做资源交互)后来Yarn就把Job职责分开(只做资源交互),AM:负责资源调度任务(AM里面有Driver,Driver与Task关联,RM和NM关联(解耦合了),中间有一个AM(资源,计算:两者之间有AM起到中间连接的作用))最终导致我们的计算框架是可插拔的(也就是Yarn计算,至今还在用RM的原因就是Yarn的存在。)
- M中:NM与Task之间也有一个中间链接Conntainer,Container相当于当前电脑里面的虚拟机,他会占用计算机的一定资源来进行计算,但是他的计算和外面的计算又没有关系,降低了偶合性。
- MR:比较繁杂,只有Map和Reduce两种操作,复杂的逻辑需要大量的样板代码,处理效率低,Map中甲结果写磁盘,Reduce写HDFS,多个Map通过HDFS交换数据
2013年10月发布2.X版本(Yarn):Yarn作用(就是资源调度框架),所以Yarn主要用于计算,和存储没有太大关系(存储是HDFS执行的)
Yarn架构图
Spark2013年6月发布
定义:Spark是一种基于内存的快速,通用,可扩展的大数据分析引擎
Spark基于Hadoop1.X架构思想,采用自己的方式去改善Hadoop1.X中的问题
交互式shell方便开发测试。
速度快,易用性强(支持java,Scala,Python,R语言,),通用性(客运与机器学),随处运行(Yarn,Mesos,Stand alone(Scala单机模式),Local(与Map Reduce运行很相似))。
Saprk基于内存,并且基于Scala语法开发,所以天生适合迭代式计算(这就是比Map Reduce好的原因)。
Master做资源调度的,Worker和NodeManager相似
资源调度里面的计算:在ApplicationMaster运行,
执行器:是Executor计算的
Saprk存储是在Hadoop中存储的
Saprk和Yarn做计算框架,HDFS做存储框架。
Spark架构设计->运行架构图
中间有ClusterManager作为中间传呼变量。保证两者之间没有耦合性。
Spark内置模块
两个重要角色:Driver(驱动器)可以理解为管理者,Executor(执行器)可以理解为执行者(计算的)
查看Yarn当前运行路径和配置信息:yarn application -list
19/09/29 21:36:32 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/09/29 21:36:33 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):0
Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL
[aaa@qq.com ~]#
[aaa@qq.com bin]# ./spark-shell
19/09/29 21:11:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://bigdata:4040
Spark context available as 'sc' (master = local[*], app id = local-1569805902675).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.3
/_/
其中:app id = local-1569805902675与Application-Id是一样的。
独立调度器是HDFS自己的支援调度器,Yarn是Hadoop的资源调度,Mesos是Apart另外一个资源调度(国外盛行)
Spark技术栈:
面试Spark版本:1.6或者2.2(主要)版本
自己虚拟机上安装:
运行模式:
Local模式:Local[ * ]模式就直接帮你按照cpu最多cores来设置线程数了。
共享文件夹:
直接搜索–》打开文件资源管理器–》输入ftp://192.168.10.150/–》密码:1–》直接下载下来。
Spark架构核心组件:
术语 | 说 明 |
---|---|
Application | 建立在Spark上的用户程序,包括Driver代码和运行在集群各节点Executor中的代码 |
Driver program | 驱动程序。Application中的main函数并创建****SparkContext |
Cluster Manager | 在集群(Standalone、Mesos、YARN)上获取资源的外部服务 |
Worker Node | 集群中任何可以运行Application代码的节点 |
Executor | 某个Application运行在worker节点上的一个进程 |
Task | 被送到某个Executor上的工作单元 |
Job | 包含多个Task组成的并行计算,往往由Spark Action触发生成,一个Application中往往会产生多个****Job |
Stage | 每个Job会被拆分成多组Task,作为一个TaskSet,其名称为****Stage |
在IDEA里面展示Spark程序,先设置xml文件
//书写格式要标准
//从官网下载
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.3</version>
</dependency>
SparkSession
- Spark2.0+应用程序
查看Spark程序运行界面的路径是:192.168.56.101:4040
Spark - WordCount实现
sc:表示Spark的上下文对象,本身就可以找到相关路径取读数据。
textFile(传送路径/相对路径):通过一种方法来得到文本文件的内容,这里面是传送路径(把文件传进来)读数据的额方式是一行一行读的。
flatMap(_.split(" ")):表示把文件传出去(可迭代)扁平化压平操作,按照空格分隔符将一行数据映射成一个个单词;
map((_,1)):返回值【map(x=>{(x,1)})简写成:map((_,1))】:对每一个元素操作,将单词映射为元组。
reduceByKey(_+_):相同的 Key 进行聚合,reduce聚合的时候是两两相加的,第一个参数作为第一个参数,第一个参数作为第二个参数(最后得到的而结果就是元组)。
collect:收集展现在控制台上(Driver)
scala> sc.textFile("/root/a.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res5: Array[(String, Int)] = Array((mm,1), (hello,4), (cmmagk,1), (world,1), (lice,1), (makkk,1), (kb02,2))
scala>
在IDEA中实现Spark步骤:
创建方法一个maven工程,不走前期博客可见(类似)
//使用 SparkSession之前需要导包
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.3</version>
</dependency>
//运行代码
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
//使用开发工具完成Spark WordCount的开发
//LocaL模式
//创建SparkConf对象
//设定Spark设计框架的运行(部署)环境
//App id
val config:SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordCount")
//创建一个Spark上下文对象sc
val sc = new SparkContext(config)
//读取文件,将文件内容一行一行的读取出来,这里的String是一行的意思
val lines:RDD[String] = sc.textFile("d:/a.txt")
//将一行一行的数据分解一个一个的单词,这里的String是一个一个的意思
val words:RDD[String] = lines.flatMap(_.split(" "))
//为了统计方便,将单词数据进行结构数据转换
val wordToOne:RDD[(String,Int)] = words.map((_,1))
//对转换结构后的数据进行分组聚合
val wordToSum:RDD[(String,Int)] = wordToOne.reduceByKey(_+_)
//将统计结果采集后打印到控制台
val result:Array[(String,Int)] = wordToSum.collect()
//println(result)
result.foreach(println)
}
}
运行结果:
Spark Yarn部署流程:
Java IO回顾:
Java IO中体现装饰者设计模式,表示一种功能的扩展(比如:以前没有这个功能,但是现在我可以包装一下,修饰一下就有这个功能了)。
输入I,输出O
字节(rar,zip,dat,png,jpeg),字符(txt)字符流里面可以一行一行的读数据(BuffereReader(入),BuffereWriter(出))
//文件输入流
Input Stream in = new FileInputStream(“xxxxxxxx”)
//缓冲流,解决前面文件传输效率的问题
InputStream bufferIn = new BufferedInputStream( new FileInputStream(“xxxxxxx”))
//文件输入流
Input Stream in = File Input Stream(“xxxxxx”)
//使用字符流读取一行数据
Reader reader = new BuffereReader(new FileInputStream( in," UTF-8 " ))
RDD编程 - - - 编程模型
Spark中,RDD被标示为对象,通过对象的方法调用来的RDD进行转换。金国一系列的transformations定义RDD之后,就可以调用actions触发RDD的计算,action可以是向应用程序返回结果(count,collect等),或者时向存储系统保存数据(saveAsTextFile等),在Spark中只有遇到action,才会执行RDD的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。
mSpark RDD 概念
- RDD是将数据项拆分为多个分区的集合,存储在集群的工作节点上的内存中,并执行正确的的操作
- RDD是用于数据转换的接口
- RDD指向了存储在HDFS,Cassandra,HBase等,或缓存(内存,内存+磁盘,仅磁盘等),或在故障或缓存回收时重新计算其他RDD分区中的数据。
- RDD是弹性分布式数据集(Resilient Distributed Datasets)
- RDD是只读的,分区记录的集合,每个分区
- RDD
RDD的特性:
一系列的分区(分片)信息,每个任务处理一个分区
每个分区上都有compute函数,计算该分区中的数据
RDD之间有一系列的依赖
分区函数决定数据(key-value)分配至哪个分区
最佳位置列表,将计算任务分派到其所在处理数据块的存储位置
创建RDD的方式,两种
val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd.count
//下面两个都是从内存创建,底层实现时一样的。
rdd.partitions.size
//一般正常使用 .makeRDD
val rdd = sc.makeRDD(List(1,2,3,4,5,6))
rdd.partitions.size
val rdd = sc.parallelize(List(1,2,3,4,5,6),5)
RDD概述
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据(计算)抽象。代码中是一个抽象类,他代表一个不可变,可分区,里面的元素可并行计算集合。
RDD:体现了装饰者设计模式,是将数据处理逻辑进行封装。
并行与并发的区别:
并发:线程并发(一个CPU的内核有多个线程来抢栈资源,这时只有一个线程资源能抢占上,其他的都抢占不上,应为线程堵塞了,最后线程释放之后其他的线程才有机会),这个时间会非常快。
并行:多个CPU迎接多个线程资源,不用抢占,可以同时执行多个任务。
RDD特点:
RDD表示只读的分区数据集,对RDD进行改动,只能通过RDD的上转换操作,有一个个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必须的信息。RDD之间存在依赖,RDD的执行按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化RDD来切断血缘关系。
算子:解决问题其实就是将问题的初始状态,通过一系列的操作(算子Operate)对问题的状态进行转换,然后达到完成(解决)状态。
Spark算子分为两大类:(1)转换算子(2)行动算子
RDD的转换(面试开发重点)
map(func)案例----映射,其实就是一种结构的变化
map是对每一天小数据做操作的
1)作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换组成
2)需求:创建一个1-10数组的RDD,将所有元素*2新城新的RDD
//创建
scala> var source = sc.parallelize(1 to 10)
//打印
scala>var source.collect()
res1:Array[Int] = Array(1,2,3,4,5,6,7,8,9,10)
//将所有元素*2
scala> val mapadd = source.map(_*2)
//打印最终结果
scala>mapadd.collect()
//在IDEA中展现
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_Oper1 {
def main(args: Array[String]): Unit = {
val config:SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//创建Spark上下文对象
val sc = new SparkContext(config)
//map算子
val listRDD:RDD[Int] = sc.makeRDD(1 to 10)
val mapRDD:RDD[Int] = listRDD.map(x => x*2)//x => x*2也可以写成 _*2,因为x当前这个参数只是用一次,可以省略。
//行动算子 .collect()
mapRDD.collect().foreach(println)
}
}
mapPartitiond(func)案例 - - ->>对每一条分区做操作
1)作用:类似于map,但是独立地在RDD色每一个 (分区) 上运行,因此在类型为T的 RDD 上运行时,func的函数类型必须是Iterator [ T ] = > Iterator [ U ]。假设没有N个元素,由M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。
2)需求:创建一个RDD,使每个元素 *2 组成新的RDD
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_Oper2 {
def main(args: Array[String]): Unit = {
val config:SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//创建Spark上下文对象
val sc = new SparkContext(config)
//mapPartitions
val listRDD:RDD[Int] = sc.makeRDD(1 to 10)
//mapPartitions可以对一个RDD中所有的分区进行遍历
//mapPartitions效率优于map算子,减少了发送到执行器的交互次数。
//严重问题:mapPartitions可能会产生内存溢出(简称:OOM),不释放(里面有引用程序存在),。
val mapPartitionsRDD:RDD[Int] = listRDD.mapPartitions(datas=>{datas.map(data=>data*2)})
mapPartitionsRDD.collect().foreach(println)
}
}
map与mapPartitions的区别:
1)map() :每次处理一条数据,做映射和转换用的(转换结构)
2)mapPartition():每次处理一个分区的数据,这个分区的数据处理完毕后,原RDD中分区的数据才能被释放,可能导致OOM(内存溢出)。
3)开发指导:当内存空间较大的时候建议使用mapPartition(),以提高效率。
mapPartitionsWithIndex(func)案例 — 有索引的概念(0,1,2,3号分区等)
1)作用:类似于mapPartitions,但是func带有一个整数参数表示分片的索引值,因此在类型为 T 的RDD上运行时,func的函数类型必须是(Int,Interator[ T ])=> Iterator[ U ];
2)需求:创建一个RDD,是每个元素跟所在分区形成一个元组组成一个新的RDD
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark04_Oper3 {
def main(args: Array[String]): Unit = {
val config:SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//创建Spark上下文对象
val sc = new SparkContext(config)
//mapPartitions
//这里的 2 表示分区数
val listRDD:RDD[Int] = sc.makeRDD(1 to 10 ,2)
//在多个参数的时候使用模式匹配,一般写法使用 { },可能会有多行
val tupleRDD: RDD[(Int,String)] = listRDD.mapPartitionsWithIndex {
//case( 函数入口 )=> { 逻辑 }
//case(分区号,数据集合)=>{返回的还是数据集合}
case (num, datas) => {
datas.map((_, "分区号" + num))
}
}
tupleRDD.collect().foreach(println)
}
}
Spark中Driver & Executor的区别
Driver与Executor可能不在一台机器上面,就需要通过网络传送数据。
所有RDD算子的计算功能都是Executor来执行的,其他的都在Driver中执行的(需要序列化)
flatMap(func)案例 — 扁平化处理
1)作用**:类似于map,但是每一个输入元素可以被映射为0或多个输出元素**(所以func应该返回一个序列,而不是一个单元)
2)需求:创建一个元素1-5的RDD,运用flat Map创建一个新的RDD,新的RDD为原RDD的每个元素的两倍(2,4,6,8,10)
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark05_Oper4 {
def main(args: Array[String]): Unit = {
val config:SparkConf = new SparkConf().setMaster("LOCAT[*]").setAppName("WordCount")
//创建Spark上下文对象
val sc = new SparkContext(config)
val listRDD:RDD[List[Int]] = sc.makeRDD(Array(List(1,2),List(3,4)))
//flatMap
//1,2,3,4
val flatMapRDD:RDD[Int] = listRDD.flatMap(datas=>datas)
flatMapRDD.collect().foreach(println)
}
}
glom案例
1)作用:将每一个分区新城一个数组,形成新的RDD类型时RDD [ Array[ T ] ],可以进行最值,转换等
2 ) 需求:创建一个2个分区的RDD,并将每个分区的数据梵高一个数组
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark06_Oper5 {
def main(args: Array[String]): Unit = {
val config:SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//创建Spark上下文对象
val sc = new SparkContext(config)
val listRDD:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,7,8,9),2)
//将一个分区的数据放到一个数组中
val glomRDD:RDD[Array[Int]] = listRDD.glom()
glomRDD.collect().foreach(array=>{
//mkString(",") 将集合转化成字符串
println(array.mkString(","))
})
}
}
groupBy(func)案例
1)作用:分组,按照传入函数的返回值进行分组,将相同的key对应的值放入一个迭代器。
2)需求:创建一个RDD,按照元组模以2的值进行分组。
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Soark07_Oper6 {
def main(args: Array[String]): Unit = {
val config:SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//创建Spark上下文对象
val sc = new SparkContext(config)
//生成数据,按照指定的规则进行分组
//分组后的数据形成了对偶元组(K-V),K表示分组得分key,V表示分组的数据集合
val listRDD:RDD[Int] = sc.makeRDD(List(1,2,3,4))
val groupByRDD:RDD[(Int,Iterable[Int])] = listRDD.groupBy(i=>i%2)
groupByRDD.collect().foreach(println)
}
}
filter(func)案例
1)作用:过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
2)需求:创建一个RDD(由字符串组成),过滤出一个新的RDD(包含“xiao”子串)
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark08_Oper7 {
def main(args: Array[String]): Unit = {
val config:SparkConf = new SparkConf().setMaster("local[*]").setAppName("WorCount")
//创建Spark上下文对象
val sc = new SparkContext(config)
//生成数据,按照指定的规则进行过滤
val listRDD:RDD[Int] = sc.makeRDD(List(1,2,3,4))
val filterRDD:RDD[Int] = listRDD.filter(x=>x%2==0)
filterRDD.collect().foreach(println)
}
}
sample()案例 - - ->> 采样(抽样)
1)作用:以指定的随机种子随机抽样出数量为fraction的数据,with Replacement表示是抽出的数据是否放回,true为返回的抽样,false为五方会的抽样,seed用于指定随机数生成器种子。
2)在Java中没有真正的随机数,都是通过一套随机算法,按照某种计算规则的出相应的结果,只要随机数种子相同,后面的随机数就相同,因为都是按照相同的随机算法执行的。
注意:一般情况下都是拿时间戳作为随机数种子,这样可以避免种子相同产生随机数据不变的情况。
数据倾斜:就是数据分配不均衡造成的。解决方法:加随机数,反转,加时间戳等等
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark09_Oper8 {
def main(args: Array[String]): Unit = {
val config:SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//创建Spark上下文对象
val sc = new SparkContext(config)
//生成数据,按照指定的规则进行过滤
val listRDD:RDD[Int] = sc.makeRDD(1 to 10)
//从指定的数据集合中进行抽样处理,根据不同的算法进行抽样。
//这里面的0.4代表一种比较标准,通过随机数算法,进行比较得出数据
//只要种子相同(比如:0.4)则无论那一次,结果都是一样的。
//这里的false表示不放回的抽取
val sampleRDD:RDD[Int] = listRDD.sample(false,0.4,1)
sampleRDD.collect().foreach(println)
}
}
distinct([numTasks])案例—去重操作
**注意:**将rdd中一个分区的数据打乱重新组合到其他不同分区的操作,称之为shuffle操作,只要是一个分区中没有打乱,则不能称作shuffle操作(shuffle有一个洗牌的概念)。(这里面的数据通过sheffle过程将数据打乱重新组合,会出现数据顺序重组。)
在Spark中所有的转换算子中没有shuffle的算子性能比较快。,可以告诉利用资源
1)作用:对源RDD进行去重后返回一个新的RDD。默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTacks参数改变它。
2)需求:创建一个RDD,使用distinct()对其去重。
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark10_Oper9 {
def main(args: Array[String]): Unit = {
val config:SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//创建Spark上下文对象
val sc = new SparkContext(config)
//从指定的数据集合中进行抽样处理,根据不同的算法进行抽样
val listRDD:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,2,9,7,1))
//val distinctRDD:RDD[Int] = listRDD.distinct()
//可以改变分区
//使用distinct算子对数据去重,但是因为去重后导致数据减少,所以可以改便默认的分区数量
val distinctRDD:RDD[Int] = listRDD.distinct(2)
distinctRDD.collect().foreach(println)
}
}
coalesce(numPartitions)案例— 只是缩减分区(合并分区)只要没有打乱重组就一定没有shuffle
1)作用:缩减分区数,用于大数据过滤后,提高小数据集的执行效率
2)创建一个4个分区RDD,对其缩减分区
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark11_Oper9 {
def main(args: Array[String]): Unit = {
val config:SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//创建Spark上下文对象
val sc = new SparkContext(config)
//缩减分区数
val listRDD:RDD[Int] = sc.makeRDD(1 to 10 ,4)
println("缩减分区前="+listRDD.partitions.size)
val coalesceRDD:RDD[Int] = listRDD.coalesce(3)
println("所减分区后="+coalesceRDD.partitions.size)
sc.stop()
}
}
Driver(驱动器)
Spark的驱动器是执行开发程序中的main方法进程它负责开发人员编写的用来创建Spark shell,创建RDD,以及进行RDD的转化操作和行动操作代码的执行。如果你是用Sparkshell ,那么当你启动Spark shell的时候,系统后台自启了一个Spark驱动器程序,就是在Spark shell中预加载的一个叫做sc的Spark Context对象,如果驱动器成雪终止,那么Spark应用也就结束了,主要负责:
1)把用户程序喜欢花为作业(JOB)
2)跟踪Executor的运行状况(Executor是真正的执行器)
3)为执行器节点调度任务
4)UI展示应用运行状况
repartition(numpartitions)案例—numpartitions表示表分区的数量
1)作用:根据分区数,重新通过网络随机洗牌所有数据。
2)需求:创建一个4个分区的RDD,随其重新分区。
//创建一个4个分区的RDD
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
//收集一下看看
scala> rdd.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16)
//分区从重新调试变成 2
scala> val rerdd = rdd.repartition(2)
rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at repartition at <console>:25
//收集一下看看rerdd ---顺序已经打乱
scala> rerdd.collect
res1: Array[Int] = Array(1, 3, 6, 8, 9, 11, 13, 15, 2, 4, 5, 7, 10, 12, 14, 16)
//使用.glom把分区放在数组里面
scala> rerdd.glom.collect
res2: Array[Array[Int]] = Array(Array(1, 3, 6, 8, 9, 11, 13, 15), Array(2, 4, 5, 7, 10, 12, 14, 16))
scala>
coalesce和repartition的区别
- coalesce重新分区,可以选择是否进行shuffle过程,由参数shuffle:Boolean = false/true决定。(灵活)
- repartition实际上是调用的coalesce,默认是进行shuffle 的。
sortBy(func,[ascendin],[numTasks])案例
-
作用:使用func先对数据进行处理,按照处理后的数据比较结果排序,默认正序排列。
-
需求:创建一个RDD,按照不同的规则进行排序
//创建一个RDD scala> val rdd = sc.parallelize(List(2,1,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:24 //默认升序排列 scala> rdd.sortBy(x=> x).collect() res3: Array[Int] = Array(1, 2, 3, 4) //追加false,改为降序排列 scala> rdd.sortBy(x=> x,false).collect() res4: Array[Int] = Array(4, 3, 2, 1) scala>
双Value类型交互
nuion(otherDataset)案例
合并:nuion()— 把两个查询结果集合并在一块
- 作用:对源RDD和参数RDD求并集后返回一个新的RDD
- 需求:创建两个RDD,求并集
scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(5 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24
scala> val rdd3 = rdd1.union(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[24] at union at <console>:27
scala> rdd3.collect
res5: Array[Int] = Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10)
scala>
subtract(otherDataset)案例
-
作用:计算差的一种函数,去除两个RDD中相同的元素,不同的RDD将保留下来
-
需求:创建两个RDD,求第一个RDD与第二个RDD的差集
scala> val rdd2 = sc.parallelize(1 to 5) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at <console>:24 scala> val rdd1 = sc.parallelize(3 to 8) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:24 scala> rdd1.subtract(rdd2).collect() res7: Array[Int] = Array(6, 8, 7) scala>
intersection(otherDataset)案例
-
作用:对源RDD和参数RDD求交集后返回一个新的RDD
-
需求:创建两个RDD,求两个RDD交集。
-
与上面的差相似。
cartesian(otherDataset)案例
-
作用:笛卡尔积(尽量避免使用)
-
需求:创建两个RDD,计算两个RDD的笛卡尔积
-
两张表连接乘积(100*100=10000)
zip(otherDataset)案例
-
作用:将两个RDD组合成key/cakue形式的RDD,这里默认两个RDD的partition数量以及元素数量相同,否则会出现异常。
-
需求:创建两个RDD,并将两个RDD组合个到一起形成一个(k,v)RDD
-
注意:scala中两个拉链个数不一样的时候,也可以省略掉了多的数,但是在spark中拉链的个数一定要相同
-
两个拉链的分区数也要相同,
scala> val rdd1 = sc.parallelize(Array(1,2,3),3) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at parallelize at <console>:24 scala> val rdd2 = sc.parallelize(Array("a","b","c"),3) rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[37] at parallelize at <console>:24 scala> rdd1.zip(rdd2).collect res8: Array[(Int, String)] = Array((1,a), (2,b), (3,c)) scala>
-
-
-
Key-Value类型
partitionBy案例 — 通过什么什么分区
-
作用:对 pairRDD 进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区,否则回生成Shuffle RDD,即回产生sheffle过程。
-
需求:创建一个4个分RDD,对其冲新分区。
scala> val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[39] at parallelize at <console>:24 scala> var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2)) rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[40] at partitionBy at <console>:25 scala> rdd2.collect res9: Array[(Int, String)] = Array((2,bbb), (4,ddd), (1,aaa), (3,ccc)) //取模排列的 scala> rdd2.glom.collect res10: Array[Array[(Int, String)]] = Array(Array((2,bbb), (4,ddd)), Array((1,aaa), (3,ccc))) scala>
自定义分区器
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark13_Oper12 {
def main(args: Array[String]): Unit = {
val config:SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//创建Spark上下文对象
val sc = new SparkContext(config)
val listRDD = sc.makeRDD(List(("a",1),("b",2),("c",3)))
//指定3个分区
val partRDD:RDD[(String,Int)] = listRDD.partitionBy(new MyPartitioner(3))
}
}
//声明分区器--自己模仿一下Hashpartitioner源码
//继承Partitioner类---》需要重写抽象方法
class MyPartitioner (partitions : scala.Int) extends org.apache.spark.Partitioner {
override def numPartitions: Int = {
//这是一个函数,返回一个partitions
partitions
}
override def getPartition(key: Any): Int = {
//只要前面哪一个值就放在1号了分区里面
1
}
}
grouoByKey案例
-
作用:groupByKey也是对每个key进行操作,但是只生成一个sequence。
-
需求:创建一个pairRDD,将相同的key对应值聚合到一个sequence中,并计算相同的key对应值的相加结果。
-
将同一个key的值放在一个组里面,就根据key分组。
scala> val word = Array("one","two","two","three","three","three") word: Array[String] = Array(one, two, two, three, three, three) scala> val wordPairsRDD = sc.parallelize(word).map(word => (word,1)) wordPairsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[43] at map at <console>:26 scala> val group = wordPairsRDD.groupByKey() group: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[44] at groupByKey at <console>:25 scala> group.collect res12: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1))) scala> group.map(t => (t._1,t._2.sum)).collect res14: Array[(String, Int)] = Array((two,2), (one,1), (three,3)) //reduceByKey(func,[numTasks])案例 //在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同的key聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。 scala> wordPairsRDD.reduceByKey(_+_).collect res15: Array[(String, Int)] = Array((two,2), (one,1), (three,3)) scala>
reduceByKey和groupByKey的区别
- **reduceByKey:**按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]----效率会变得更高一些
- groupByKey:按照key进行分组,直接进行shuffle
- 开发指导:reduceByKey比groupByKey,建议使用。但是需要注意会影响业务逻辑。
Spark算子分为转换算子和行动算子
Action–>>行动算子
reduce(fune)案例–>>就是化简操作
-
作用:通过func函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。
-
创建一个RDD,将是u偶有元素聚合得到结果。
//单一数据的聚合 scala> val rdd1 = sc.makeRDD(1 to 10,2) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[47] at makeRDD at <console>:24 scala> rdd1.reduce(_+_) res16: Int = 55 scala> //K-V数据聚合 scala> val rdd2 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5))) rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[48] at makeRDD at <console>:24 scala> rdd2.reduce((x,y)=>(x._1+y._1,x._2+y._2)) res17: (String, Int) = (aacd,12) scala>
collect () 案例 – 数据收集采集
- 作用:在驱动程序中,以数组的形式返回数据集的所有元素。
- 需求:创建一个RDD,并将RDD内容收集到Driver端打印
scala> val rdd = sc.parallelize(1 to 10) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[49] at parallelize at <console>:24 scala> rdd.collect res18: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala>
count ()案例 — 求数量
- 作用:返回RDD中元素的个数
- 需求建立一个RDD,统计该RDD的条数
scala> val rdd = sc.parallelize(1 to 10) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[49] at parallelize at <console>:24 scala> rdd.collect res18: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> rdd.count res19: Long = 10 scala>
first () 案例
- 返回RDD中的第一个元素
- 需求:创建一个RDD,返回RDD中的第一个元素
taKen(n)案例
- 作用:返回RDD的前n个元素组成的数组
- 需求:创建一个RDD,统计该RDD的条数
takeOrded(n)案例
- 作用:返回RDD排序后的前n个元素组成的数组
- 需求:创建一个RDD,统计该RDD的条数
- 注意:里面默认是升序排列
aggregate案例
- 参数:(zeroValue:U)(seqOp:(U,T)=>U,combOp:(U,U=>U)
- 初始值 分区内相加,分区间相加
- 作用:aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后使用combine函数将每个分区的结果和初始值(zeroValue)操作。这个函数最终返回的类型将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型不一致。
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[50] at makeRDD at <console>:24
// (初始值)(分区内相加,分区间相加)
scala> rdd1.aggregate(0)(_+_,_+_)
res21: Int = 55
//aggregate会把分区内的初始值相加,分区间也会把初始值相加
scala> rdd1.aggregate(10)(_+_,_+_)
res22: Int = 85
scala>
fold (num)(func)案例
- 作用:折叠操作,aggregate的化简操作,seqop和combop一样
- 需求:创建一个RDD,将所有元素相加得到结果。
countByKey()案例
- 作用:针对(K-V)类型的RDD,返回一个(K,Int)的map,表示一个key对应的元素个数。
- 需求:创建一个PairRDD,统计每种Key的个数。
scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[51] at parallelize at <console>:24
scala> rdd.countByKey
res23: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)
scala>
spark中RDD常用转换算子
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object TestTransformation {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("TeatTransformation")
val sc = SparkContext.getOrCreate(conf)
//对1-10的集合每一个元素*2
// val rdd1:RDD[Int] = sc.parallelize(seq = 1 to 10)
// val rdd2:RDD[Int] = rdd1.map(_*2)
// rdd2.collect().foreach(println(_))
// val rdd1 = sc.parallelize(List("zhangsan","lisi","wangwu"))
// val rdd2:RDD[(String, Int)] = rdd1.map(x => (x,x.length))
// rdd2.collect().foreach(println(_))
// sc.stop()
//使用 .filter()过滤
val rdd1 = sc.parallelize(List("zhangsan","lisi","wangwu"))
val rdd2 = rdd1.filter(_.length > 7)
sc.stop()
//使用mapValuse()
val rdd1 = sc.parallelize(List("zhangsan","lisi","wangwu"))
val b= a.map(x=>(x.length,x))
val c = b.mapValues("x"+_+"x")
c.collect.foreach(println(_))
sc.stop()
//RDD常用算子的计算.distinct
val add1 = sc.parallelize(List(1,2,3,4,5,6,7))
rdd.distinct().collect().foreach(println(_))
println(rdd1.distinct(numPartitions = 8).parttitions.length)
sc.stop()
val rdd1 = sc.testFile("in/a.txt")
val rdd2 = rdd1.flatMap(_.split("")).map(_,1)
val rdd3 = rdd2.groupByKey().map((x._1,x._2.size) => (x,y.size):)
}
}
聊Spark入门
为什么有Spark?
这就是spark与Hadoop的区别:
1)移动算子(Spark)还是移动数据(Hadoop)
2)Spark需要落盘,新建集群,分布在很多节点上(Hadoop干的事),大数据就是分布式(让更多的人处理很多的硬件。Hadoop的每个数据放在每一个节点上,只能矗立在自己的盘浮上面,在硬盘上面。速度相当慢)
3)Hadoop存储完之后需要汇报,要把数据现存起来,然后后期在排查,数据迁移的工作。本质上还是速度的原因,无法实时数据处理。处理离线型数据,不能处理及时文件。数据使用128MB开始分割(躲数据),Hadoop在做数据迁移,汇成一个完成的数据。
4)Spark不需要落盘需要手续,它大部分不要落盘,有时候也需要落盘,很少很少。数据都写在内存里面,内存容量比较小,不稳定,专做即及时性数据,也可以处理离线式数据。Spark先让数据在节点上处理(使用函数)最后把数据拉回来,Spark发布命令函数给节点取处理数据,
5)Spark现在无法取代HDFS,Spark想取代Map Reduce,通常梳理流数据,Spark在内存上处理,面试出去说2.X版本,他点就是,支持各种语言。
Spark 技术栈:
今天讲Spark SQL步骤,ML机器学习,GraphX是图数据库
Spack底层是Scala开发的。
Spark初体验:
运行架构:
最后结果即可,不用移动数据,全程都在自己的节点上(前面负责给函数就行了,最后返回消息即可)
DricerProgram拿到算子
Cluster Manager是中间传递的(传函数,信息,结果等等),Worker Node是执行器,执行器里面有Cashe,可以与隔壁的一个Executor调换数据(互相沟通),
猫里面可以放wok站点的,Spark的SparkContext也可以放上下文环境。
Spark Context RDD就是一个函数(分布式弹性算子),把函数分布给多个数据节点
RDD -> function falatMap map reduceByKey
public (返回函数或返回一个对象) Object abc(int size){
String sql = "select * from userinfos";
ResultSet rs = .... //算子即使函数体
RDD 是一个算子,或者数据存储结构(可以存放数据) 是分布式的
}
RDD相当于一个函数,函数由SparkContext.getOrCreate来点的。
Spark可以不需要启动Hadoop,一般情况下还是需要安装Hadoop的。
sbin启动环境 bin在魂晶内部使用的命令
黑界面里面you默认的sc,不能重新创建了。
算子:转换算子(转化算子) 动作算子(最后溢出的时候常出现)
落盘:在数据迁移的时候会落盘。
上一篇: Python Roberts算子、Sobel算子——举例说明 ^_^
下一篇: RDD与算子
推荐阅读