欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark 算子

程序员文章站 2022-06-01 18:15:06
...

Spark大数据计算框架

Spark(火花)出现不全是代替Map Reduce,里面有批处理(Spark运行在内存中,Map Reduce运行在硬盘中,根据自身情况选择适应的开发环境(比如:机器的数量,配置,资金等等))

了解Hadoop发展历史:2011年发布1.0版本,2012年发布稳定版,2013年发布2.X版本(里面出现了Yarn,得以重用)

2012年版本中

MR的缺点:

mr基于数据集的计算,所以面向数据

​ 1.基本运算规则从存储介质中获取(采集)数据,然后进行计算,最后将结果存储到介质中,所以主要应用于一次性计算(单节点计算),不适合数据挖掘和机器学习这样的迭代计算和图形挖掘计算。

​ 2.MR基于文件存储介质操作,所以性能非常慢

​ 3.MR和Hadoop紧密耦合在一起,无法动态替换(违背了ocp原理)

​ 4.RM中,之前的Job任务太重了,导致数据运算太慢。(既要做资源调度任务,又要做资源交互)后来Yarn就把Job职责分开(只做资源交互),AM:负责资源调度任务(AM里面有Driver,Driver与Task关联,RM和NM关联(解耦合了),中间有一个AM(资源,计算:两者之间有AM起到中间连接的作用))最终导致我们的计算框架是可插拔的(也就是Yarn计算,至今还在用RM的原因就是Yarn的存在。)

  • M中:NM与Task之间也有一个中间链接Conntainer,Container相当于当前电脑里面的虚拟机,他会占用计算机的一定资源来进行计算,但是他的计算和外面的计算又没有关系,降低了偶合性。
  • MR:比较繁杂,只有Map和Reduce两种操作,复杂的逻辑需要大量的样板代码,处理效率低,Map中甲结果写磁盘,Reduce写HDFS,多个Map通过HDFS交换数据

2013年10月发布2.X版本(Yarn):Yarn作用(就是资源调度框架),所以Yarn主要用于计算,和存储没有太大关系(存储是HDFS执行的)

Yarn架构图

Spark 算子

Spark2013年6月发布

定义:Spark是一种基于内存的快速,通用,可扩展的大数据分析引擎

Spark基于Hadoop1.X架构思想,采用自己的方式去改善Hadoop1.X中的问题

交互式shell方便开发测试。

速度快,易用性强(支持java,Scala,Python,R语言,),通用性(客运与机器学),随处运行(Yarn,Mesos,Stand alone(Scala单机模式),Local(与Map Reduce运行很相似))。

Saprk基于内存,并且基于Scala语法开发,所以天生适合迭代式计算(这就是比Map Reduce好的原因)。

Master做资源调度的,Worker和NodeManager相似

资源调度里面的计算:在ApplicationMaster运行,

执行器:是Executor计算的

Saprk存储是在Hadoop中存储的

Saprk和Yarn做计算框架,HDFS做存储框架。

Spark架构设计->运行架构图

Spark 算子

中间有ClusterManager作为中间传呼变量。保证两者之间没有耦合性。

Spark 算子

Spark内置模块

Spark 算子

两个重要角色:Driver(驱动器)可以理解为管理者,Executor(执行器)可以理解为执行者(计算的)

查看Yarn当前运行路径和配置信息:yarn application -list

19/09/29 21:36:32 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/09/29 21:36:33 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):0
                Application-Id	    Application-Name	    Application-Type	      User	     Queue	             State        Final-State	       Progress	                       Tracking-URL
[aaa@qq.com ~]# 

[aaa@qq.com bin]# ./spark-shell 
19/09/29 21:11:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://bigdata:4040
Spark context available as 'sc' (master = local[*], app id = local-1569805902675).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.3
      /_/
      
      其中:app id = local-1569805902675与Application-Id是一样的。

独立调度器是HDFS自己的支援调度器,Yarn是Hadoop的资源调度,Mesos是Apart另外一个资源调度(国外盛行)

Spark技术栈:

面试Spark版本:1.6或者2.2(主要)版本

Spark 算子

自己虚拟机上安装:

Spark 算子

运行模式:

Local模式:Local[ * ]模式就直接帮你按照cpu最多cores来设置线程数了。

共享文件夹:

直接搜索–》打开文件资源管理器–》输入ftp://192.168.10.150/–》密码:1–》直接下载下来。

Spark架构核心组件:

术语 说 明
Application 建立在Spark上的用户程序,包括Driver代码和运行在集群各节点Executor中的代码
Driver program 驱动程序。Application中的main函数并创建****SparkContext
Cluster Manager 在集群(StandaloneMesosYARN)上获取资源的外部服务
Worker Node 集群中任何可以运行Application代码的节点
Executor 某个Application运行在worker节点上的一个进程
Task 被送到某个Executor上的工作单元
Job 包含多个Task组成的并行计算,往往由Spark Action触发生成,一个Application中往往会产生多个****Job
Stage 每个Job会被拆分成多组Task,作为一个TaskSet,其名称为****Stage

在IDEA里面展示Spark程序,先设置xml文件

//书写格式要标准
//从官网下载
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-core_2.11</artifactId>
	<version>2.4.3</version>
</dependency>

SparkSession

  • Spark2.0+应用程序

Spark 算子


查看Spark程序运行界面的路径是:192.168.56.101:4040

Spark - WordCount实现

sc:表示Spark的上下文对象,本身就可以找到相关路径取读数据。
textFile(传送路径/相对路径):通过一种方法来得到文本文件的内容,这里面是传送路径(把文件传进来)读数据的额方式是一行一行读的。
flatMap(_.split(" ")):表示把文件传出去(可迭代)扁平化压平操作,按照空格分隔符将一行数据映射成一个个单词;

map((_,1)):返回值【map(x=>{(x,1)})简写成:map((_,1))】:对每一个元素操作,将单词映射为元组。
reduceByKey(_+_):相同的 Key 进行聚合,reduce聚合的时候是两两相加的,第一个参数作为第一个参数,第一个参数作为第二个参数(最后得到的而结果就是元组)。
collect:收集展现在控制台上(Driver)
scala> sc.textFile("/root/a.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res5: Array[(String, Int)] = Array((mm,1), (hello,4), (cmmagk,1), (world,1), (lice,1), (makkk,1), (kb02,2))
scala> 

在IDEA中实现Spark步骤:

创建方法一个maven工程,不走前期博客可见(类似)

//使用 SparkSession之前需要导包
<dependency> 
	<groupId>org.apache.spark</groupId>  
	<artifactId>spark-sql_2.11</artifactId> 
	<version>2.4.3</version>
</dependency>

Spark 算子

//运行代码

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    //使用开发工具完成Spark WordCount的开发

    //LocaL模式
    //创建SparkConf对象
    //设定Spark设计框架的运行(部署)环境
    //App id
    val config:SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordCount")

    //创建一个Spark上下文对象sc
    val sc = new SparkContext(config)

    //读取文件,将文件内容一行一行的读取出来,这里的String是一行的意思
    val lines:RDD[String] = sc.textFile("d:/a.txt")

    //将一行一行的数据分解一个一个的单词,这里的String是一个一个的意思
    val words:RDD[String] = lines.flatMap(_.split(" "))

    //为了统计方便,将单词数据进行结构数据转换
    val wordToOne:RDD[(String,Int)] = words.map((_,1))

    //对转换结构后的数据进行分组聚合
    val wordToSum:RDD[(String,Int)] = wordToOne.reduceByKey(_+_)

    //将统计结果采集后打印到控制台
    val result:Array[(String,Int)] = wordToSum.collect()

    //println(result)
    result.foreach(println)
  }
}

运行结果:

Spark 算子

Spark Yarn部署流程:

Spark 算子

Spark 算子

Java IO回顾:

Java IO中体现装饰者设计模式,表示一种功能的扩展(比如:以前没有这个功能,但是现在我可以包装一下,修饰一下就有这个功能了)。

输入I,输出O

字节(rar,zip,dat,png,jpeg),字符(txt)字符流里面可以一行一行的读数据(BuffereReader(入),BuffereWriter(出))

//文件输入流

Input Stream in = new FileInputStream(“xxxxxxxx”)

//缓冲流,解决前面文件传输效率的问题

InputStream bufferIn = new BufferedInputStream( new FileInputStream(“xxxxxxx”))

Spark 算子

//文件输入流

Input Stream in = File Input Stream(“xxxxxx”)

//使用字符流读取一行数据

Reader reader = new BuffereReader(new FileInputStream( in," UTF-8 " ))

Spark 算子

RDD编程 - - - 编程模型

Spark中,RDD被标示为对象,通过对象的方法调用来的RDD进行转换。金国一系列的transformations定义RDD之后,就可以调用actions触发RDD的计算,action可以是向应用程序返回结果(count,collect等),或者时向存储系统保存数据(saveAsTextFile等),在Spark中只有遇到action,才会执行RDD的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。

mSpark RDD 概念

  • RDD是将数据项拆分为多个分区的集合,存储在集群的工作节点上的内存中,并执行正确的的操作
  • RDD是用于数据转换的接口
  • RDD指向了存储在HDFS,Cassandra,HBase等,或缓存(内存,内存+磁盘,仅磁盘等),或在故障或缓存回收时重新计算其他RDD分区中的数据。
  • RDD是弹性分布式数据集(Resilient Distributed Datasets)
    • RDD是只读的,分区记录的集合,每个分区
    • RDD

RDD的特性:

一系列的分区(分片)信息,每个任务处理一个分区
每个分区上都有compute函数,计算该分区中的数据
RDD之间有一系列的依赖
分区函数决定数据(key-value)分配至哪个分区
最佳位置列表,将计算任务分派到其所在处理数据块的存储位置

创建RDD的方式,两种

val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd.count
//下面两个都是从内存创建,底层实现时一样的。
rdd.partitions.size
//一般正常使用 .makeRDD
val rdd = sc.makeRDD(List(1,2,3,4,5,6))

rdd.partitions.size
val rdd = sc.parallelize(List(1,2,3,4,5,6),5)

RDD概述

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据(计算)抽象。代码中是一个抽象类,他代表一个不可变,可分区,里面的元素可并行计算集合。

RDD:体现了装饰者设计模式,是将数据处理逻辑进行封装。

Spark 算子

并行与并发的区别:

并发:线程并发(一个CPU的内核有多个线程来抢栈资源,这时只有一个线程资源能抢占上,其他的都抢占不上,应为线程堵塞了,最后线程释放之后其他的线程才有机会),这个时间会非常快。

并行:多个CPU迎接多个线程资源,不用抢占,可以同时执行多个任务。

RDD特点:

RDD表示只读的分区数据集,对RDD进行改动,只能通过RDD的上转换操作,有一个个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必须的信息。RDD之间存在依赖,RDD的执行按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化RDD来切断血缘关系。

算子:解决问题其实就是将问题的初始状态,通过一系列的操作(算子Operate)对问题的状态进行转换,然后达到完成(解决)状态。

Spark算子分为两大类:(1)转换算子(2)行动算子

RDD的转换(面试开发重点)

map(func)案例----映射,其实就是一种结构的变化

map是对每一天小数据做操作的

1)作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换组成

2)需求:创建一个1-10数组的RDD,将所有元素*2新城新的RDD

//创建
scala> var source = sc.parallelize(1 to 10)
//打印
scala>var source.collect()
res1:Array[Int] = Array(1,2,3,4,5,6,7,8,9,10)
//将所有元素*2
scala> val mapadd = source.map(_*2)
//打印最终结果
scala>mapadd.collect()


    
//在IDEA中展现
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark02_Oper1 {
  def main(args: Array[String]): Unit = {
    val config:SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //map算子
    val listRDD:RDD[Int] = sc.makeRDD(1 to 10)

    val mapRDD:RDD[Int] = listRDD.map(x => x*2)//x => x*2也可以写成 _*2,因为x当前这个参数只是用一次,可以省略。
	//行动算子   .collect()
    mapRDD.collect().foreach(println)
  }
}

mapPartitiond(func)案例 - - ->>对每一条分区做操作

1)作用:类似于map,但是独立地在RDD色每一个 (分区) 上运行,因此在类型为T的 RDD 上运行时,func的函数类型必须是Iterator [ T ] = > Iterator [ U ]。假设没有N个元素,由M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。

2)需求:创建一个RDD,使每个元素 *2 组成新的RDD

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark02_Oper2 {
  def main(args: Array[String]): Unit = {
    val config:SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //创建Spark上下文对象
    val sc = new SparkContext(config)
    //mapPartitions
    val listRDD:RDD[Int] = sc.makeRDD(1 to 10)
    //mapPartitions可以对一个RDD中所有的分区进行遍历
    //mapPartitions效率优于map算子,减少了发送到执行器的交互次数。
    //严重问题:mapPartitions可能会产生内存溢出(简称:OOM),不释放(里面有引用程序存在),。
    val mapPartitionsRDD:RDD[Int] = listRDD.mapPartitions(datas=>{datas.map(data=>data*2)})

    mapPartitionsRDD.collect().foreach(println)
  }
}

map与mapPartitions的区别:

1)map() :每次处理一条数据,做映射和转换用的(转换结构)

2)mapPartition():每次处理一个分区的数据,这个分区的数据处理完毕后,原RDD中分区的数据才能被释放,可能导致OOM(内存溢出)

3)开发指导:当内存空间较大的时候建议使用mapPartition(),以提高效率。

Spark 算子

Spark 算子

mapPartitionsWithIndex(func)案例 — 有索引的概念(0,1,2,3号分区等)

1)作用:类似于mapPartitions,但是func带有一个整数参数表示分片的索引值,因此在类型为 T 的RDD上运行时,func的函数类型必须是(Int,Interator[ T ])=> Iterator[ U ];

2)需求:创建一个RDD,是每个元素跟所在分区形成一个元组组成一个新的RDD

Spark 算子

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark04_Oper3 {
  def main(args: Array[String]): Unit = {
    val config:SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //mapPartitions
    //这里的 2 表示分区数
    val listRDD:RDD[Int] = sc.makeRDD(1 to 10 ,2)
    //在多个参数的时候使用模式匹配,一般写法使用 { },可能会有多行
    val tupleRDD: RDD[(Int,String)] = listRDD.mapPartitionsWithIndex {
      //case( 函数入口 )=> { 逻辑 }
      //case(分区号,数据集合)=>{返回的还是数据集合}
      case (num, datas) => {
        datas.map((_, "分区号" + num))
      }
    }
    tupleRDD.collect().foreach(println)
  }
}

Spark中Driver & Executor的区别

Spark 算子

Driver与Executor可能不在一台机器上面,就需要通过网络传送数据。

所有RDD算子的计算功能都是Executor来执行的,其他的都在Driver中执行的(需要序列化)

flatMap(func)案例 — 扁平化处理

1)作用**:类似于map,但是每一个输入元素可以被映射为0或多个输出元素**(所以func应该返回一个序列,而不是一个单元)

2)需求:创建一个元素1-5的RDD,运用flat Map创建一个新的RDD,新的RDD为原RDD的每个元素的两倍(2,4,6,8,10)

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark05_Oper4 {
  def main(args: Array[String]): Unit = {
    val config:SparkConf = new SparkConf().setMaster("LOCAT[*]").setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    val listRDD:RDD[List[Int]] = sc.makeRDD(Array(List(1,2),List(3,4)))

    //flatMap
    //1,2,3,4
    val flatMapRDD:RDD[Int] = listRDD.flatMap(datas=>datas)
    flatMapRDD.collect().foreach(println)
  }
}

glom案例

1)作用:将每一个分区新城一个数组,形成新的RDD类型时RDD [ Array[ T ] ],可以进行最值,转换等

2 ) 需求:创建一个2个分区的RDD,并将每个分区的数据梵高一个数组

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark06_Oper5 {
  def main(args: Array[String]): Unit = {
    val config:SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    val listRDD:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,7,8,9),2)

    //将一个分区的数据放到一个数组中
    val glomRDD:RDD[Array[Int]] = listRDD.glom()

    glomRDD.collect().foreach(array=>{
      //mkString(",")  将集合转化成字符串
      println(array.mkString(","))
    })
  }
}

groupBy(func)案例

1)作用:分组,按照传入函数的返回值进行分组,将相同的key对应的值放入一个迭代器。

2)需求:创建一个RDD,按照元组模以2的值进行分组。

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Soark07_Oper6 {
  def main(args: Array[String]): Unit = {
    val config:SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //生成数据,按照指定的规则进行分组
    //分组后的数据形成了对偶元组(K-V),K表示分组得分key,V表示分组的数据集合
    val listRDD:RDD[Int] = sc.makeRDD(List(1,2,3,4))

    val groupByRDD:RDD[(Int,Iterable[Int])] = listRDD.groupBy(i=>i%2)

    groupByRDD.collect().foreach(println)
  }
}

filter(func)案例

1)作用:过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成

2)需求:创建一个RDD(由字符串组成),过滤出一个新的RDD(包含“xiao”子串)

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark08_Oper7 {
  def main(args: Array[String]): Unit = {
    val config:SparkConf = new SparkConf().setMaster("local[*]").setAppName("WorCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //生成数据,按照指定的规则进行过滤
    val listRDD:RDD[Int] = sc.makeRDD(List(1,2,3,4))

    val filterRDD:RDD[Int] = listRDD.filter(x=>x%2==0)
    filterRDD.collect().foreach(println)
  }
}

sample()案例 - - ->> 采样(抽样)

1)作用:以指定的随机种子随机抽样出数量为fraction的数据,with Replacement表示是抽出的数据是否放回,true为返回的抽样,false为五方会的抽样,seed用于指定随机数生成器种子。

2)在Java中没有真正的随机数,都是通过一套随机算法,按照某种计算规则的出相应的结果,只要随机数种子相同,后面的随机数就相同,因为都是按照相同的随机算法执行的。

注意:一般情况下都是拿时间戳作为随机数种子,这样可以避免种子相同产生随机数据不变的情况。

数据倾斜:就是数据分配不均衡造成的。解决方法:加随机数,反转,加时间戳等等

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark09_Oper8 {
  def main(args: Array[String]): Unit = {
    val config:SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //生成数据,按照指定的规则进行过滤
    val listRDD:RDD[Int] = sc.makeRDD(1 to 10)
     //从指定的数据集合中进行抽样处理,根据不同的算法进行抽样。
	//这里面的0.4代表一种比较标准,通过随机数算法,进行比较得出数据
     //只要种子相同(比如:0.4)则无论那一次,结果都是一样的。
     //这里的false表示不放回的抽取
    val sampleRDD:RDD[Int] = listRDD.sample(false,0.4,1)

    sampleRDD.collect().foreach(println)
  }
}

distinct([numTasks])案例—去重操作

**注意:**将rdd中一个分区的数据打乱重新组合到其他不同分区的操作,称之为shuffle操作,只要是一个分区中没有打乱,则不能称作shuffle操作(shuffle有一个洗牌的概念)。(这里面的数据通过sheffle过程将数据打乱重新组合,会出现数据顺序重组。)

在Spark中所有的转换算子中没有shuffle的算子性能比较快。,可以告诉利用资源

1)作用:对源RDD进行去重后返回一个新的RDD。默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTacks参数改变它。

2)需求:创建一个RDD,使用distinct()对其去重。

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark10_Oper9 {
  def main(args: Array[String]): Unit = {
    val config:SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //从指定的数据集合中进行抽样处理,根据不同的算法进行抽样
    val listRDD:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,2,9,7,1))

    //val distinctRDD:RDD[Int] = listRDD.distinct()
    //可以改变分区
    //使用distinct算子对数据去重,但是因为去重后导致数据减少,所以可以改便默认的分区数量
    val distinctRDD:RDD[Int] = listRDD.distinct(2)

    distinctRDD.collect().foreach(println)
  }
}

coalesce(numPartitions)案例— 只是缩减分区(合并分区)只要没有打乱重组就一定没有shuffle

1)作用:缩减分区数,用于大数据过滤后,提高小数据集的执行效率

2)创建一个4个分区RDD,对其缩减分区

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark11_Oper9 {
  def main(args: Array[String]): Unit = {
    val config:SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //缩减分区数
    val listRDD:RDD[Int] = sc.makeRDD(1 to 10 ,4)

    println("缩减分区前="+listRDD.partitions.size)
    val coalesceRDD:RDD[Int] = listRDD.coalesce(3)
    println("所减分区后="+coalesceRDD.partitions.size)
    sc.stop()
  }
}

Driver(驱动器)

Spark的驱动器是执行开发程序中的main方法进程它负责开发人员编写的用来创建Spark shell,创建RDD,以及进行RDD的转化操作和行动操作代码的执行。如果你是用Sparkshell ,那么当你启动Spark shell的时候,系统后台自启了一个Spark驱动器程序,就是在Spark shell中预加载的一个叫做sc的Spark Context对象,如果驱动器成雪终止,那么Spark应用也就结束了,主要负责:

1)把用户程序喜欢花为作业(JOB)

2)跟踪Executor的运行状况(Executor是真正的执行器)

3)为执行器节点调度任务

4)UI展示应用运行状况

repartition(numpartitions)案例—numpartitions表示表分区的数量

1)作用:根据分区数,重新通过网络随机洗牌所有数据。

2)需求:创建一个4个分区的RDD,随其重新分区。

//创建一个4个分区的RDD
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
//收集一下看看
scala> rdd.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16)
//分区从重新调试变成 2
scala> val rerdd = rdd.repartition(2)
rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at repartition at <console>:25
//收集一下看看rerdd  ---顺序已经打乱
scala> rerdd.collect
res1: Array[Int] = Array(1, 3, 6, 8, 9, 11, 13, 15, 2, 4, 5, 7, 10, 12, 14, 16) 
//使用.glom把分区放在数组里面
scala> rerdd.glom.collect
res2: Array[Array[Int]] = Array(Array(1, 3, 6, 8, 9, 11, 13, 15), Array(2, 4, 5, 7, 10, 12, 14, 16))

scala> 

coalesce和repartition的区别

  1. coalesce重新分区,可以选择是否进行shuffle过程,由参数shuffle:Boolean = false/true决定。(灵活)
  2. repartition实际上是调用的coalesce,默认是进行shuffle 的。

sortBy(func,[ascendin],[numTasks])案例

  1. 作用:使用func先对数据进行处理,按照处理后的数据比较结果排序,默认正序排列。

  2. 需求:创建一个RDD,按照不同的规则进行排序

    //创建一个RDD
    scala> val rdd = sc.parallelize(List(2,1,3,4))
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:24
    //默认升序排列
    scala> rdd.sortBy(x=> x).collect()
    res3: Array[Int] = Array(1, 2, 3, 4)
    //追加false,改为降序排列
    scala> rdd.sortBy(x=> x,false).collect()
    res4: Array[Int] = Array(4, 3, 2, 1)
    scala> 
    

双Value类型交互

nuion(otherDataset)案例

合并:nuion()— 把两个查询结果集合并在一块

  1. 作用:对源RDD和参数RDD求并集后返回一个新的RDD
  2. 需求:创建两个RDD,求并集
scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(5 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24

scala> val rdd3 = rdd1.union(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[24] at union at <console>:27

scala> rdd3.collect
res5: Array[Int] = Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10)

scala> 

subtract(otherDataset)案例

  1. 作用:计算差的一种函数,去除两个RDD中相同的元素,不同的RDD将保留下来

  2. 需求:创建两个RDD,求第一个RDD与第二个RDD的差集

    scala> val rdd2 = sc.parallelize(1 to 5)
    rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at <console>:24
    
    scala> val rdd1 = sc.parallelize(3 to 8)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:24
    
    scala> rdd1.subtract(rdd2).collect()
    res7: Array[Int] = Array(6, 8, 7)
    
    scala> 
    
    

    intersection(otherDataset)案例

    1. 作用:对源RDD和参数RDD求交集后返回一个新的RDD

    2. 需求:创建两个RDD,求两个RDD交集。

    3. 与上面的差相似。

      cartesian(otherDataset)案例

      1. 作用:笛卡尔积(尽量避免使用)

      2. 需求:创建两个RDD,计算两个RDD的笛卡尔积

      3. 两张表连接乘积(100*100=10000)

        zip(otherDataset)案例

        1. 作用:将两个RDD组合成key/cakue形式的RDD,这里默认两个RDD的partition数量以及元素数量相同,否则会出现异常。

        2. 需求:创建两个RDD,并将两个RDD组合个到一起形成一个(k,v)RDD

        3. 注意:scala中两个拉链个数不一样的时候,也可以省略掉了多的数,但是在spark中拉链的个数一定要相同

        4. 两个拉链的分区数也要相同,

          scala> val rdd1 = sc.parallelize(Array(1,2,3),3)
          rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at parallelize at <console>:24
          
          scala> val rdd2 = sc.parallelize(Array("a","b","c"),3)
          rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[37] at parallelize at <console>:24
          
          scala> rdd1.zip(rdd2).collect
          res8: Array[(Int, String)] = Array((1,a), (2,b), (3,c))
          scala> 
          

Key-Value类型

partitionBy案例通过什么什么分区

  1. 作用:对 pairRDD 进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区,否则回生成Shuffle RDD,即回产生sheffle过程。

  2. 需求:创建一个4个分RDD,对其冲新分区。

    scala> val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4)
    rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[39] at parallelize at <console>:24
    
    scala> var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
    rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[40] at partitionBy at <console>:25
    
    scala> rdd2.collect
    res9: Array[(Int, String)] = Array((2,bbb), (4,ddd), (1,aaa), (3,ccc))
    //取模排列的
    scala> rdd2.glom.collect
    res10: Array[Array[(Int, String)]] = Array(Array((2,bbb), (4,ddd)), Array((1,aaa), (3,ccc)))
    scala> 
    

    自定义分区器

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark13_Oper12 {
  def main(args: Array[String]): Unit = {
    val config:SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)
    val listRDD = sc.makeRDD(List(("a",1),("b",2),("c",3)))
    //指定3个分区
    val partRDD:RDD[(String,Int)] = listRDD.partitionBy(new MyPartitioner(3))

  }
}
//声明分区器--自己模仿一下Hashpartitioner源码
//继承Partitioner类---》需要重写抽象方法
class MyPartitioner (partitions : scala.Int) extends org.apache.spark.Partitioner {
  override def numPartitions: Int = {
    //这是一个函数,返回一个partitions
    partitions
  }
  override def getPartition(key: Any): Int = {
    //只要前面哪一个值就放在1号了分区里面
    1
  }
}

grouoByKey案例

  1. 作用:groupByKey也是对每个key进行操作,但是只生成一个sequence。

  2. 需求:创建一个pairRDD,将相同的key对应值聚合到一个sequence中,并计算相同的key对应值的相加结果。

  3. 将同一个key的值放在一个组里面,就根据key分组。

    scala> val word = Array("one","two","two","three","three","three")
    word: Array[String] = Array(one, two, two, three, three, three)
    
    scala> val wordPairsRDD = sc.parallelize(word).map(word => (word,1))
    wordPairsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[43] at map at <console>:26
    
    scala> val group = wordPairsRDD.groupByKey()
    group: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[44] at groupByKey at <console>:25
    
    scala> group.collect
    res12: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))
    
    scala> group.map(t => (t._1,t._2.sum)).collect
    res14: Array[(String, Int)] = Array((two,2), (one,1), (three,3))
    //reduceByKey(func,[numTasks])案例
    //在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同的key聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。
    scala> wordPairsRDD.reduceByKey(_+_).collect
    res15: Array[(String, Int)] = Array((two,2), (one,1), (three,3))
    scala> 
    

reduceByKey和groupByKey的区别

  1. **reduceByKey:**按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]----效率会变得更高一些
  2. groupByKey:按照key进行分组,直接进行shuffle
  3. 开发指导:reduceByKey比groupByKey,建议使用。但是需要注意会影响业务逻辑。

Spark 算子

Spark算子分为转换算子和行动算子

Action–>>行动算子

reduce(fune)案例–>>就是化简操作

  1. 作用:通过func函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。

  2. 创建一个RDD,将是u偶有元素聚合得到结果。

    //单一数据的聚合
    scala> val rdd1 = sc.makeRDD(1 to 10,2)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[47] at makeRDD at <console>:24
    
    scala> rdd1.reduce(_+_)
    res16: Int = 55
    
    scala> 
    //K-V数据聚合
    scala> val rdd2 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5)))
    rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[48] at makeRDD at <console>:24
    
    scala> rdd2.reduce((x,y)=>(x._1+y._1,x._2+y._2))
    res17: (String, Int) = (aacd,12)
    scala> 
    

    collect () 案例 – 数据收集采集

    1. 作用:在驱动程序中,以数组的形式返回数据集的所有元素。
    2. 需求:创建一个RDD,并将RDD内容收集到Driver端打印
    scala> val rdd = sc.parallelize(1 to 10)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[49] at parallelize at <console>:24
    
    scala> rdd.collect
    res18: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    
    scala> 
    
    

    count ()案例 — 求数量

    1. 作用:返回RDD中元素的个数
    2. 需求建立一个RDD,统计该RDD的条数
    scala> val rdd = sc.parallelize(1 to 10)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[49] at parallelize at <console>:24
    
    scala> rdd.collect
    res18: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    
    scala> rdd.count
    res19: Long = 10
    
    scala> 
    
    

    first () 案例

    1. 返回RDD中的第一个元素
    2. 需求:创建一个RDD,返回RDD中的第一个元素

taKen(n)案例

  1. 作用:返回RDD的前n个元素组成的数组
  2. 需求:创建一个RDD,统计该RDD的条数

takeOrded(n)案例

  1. 作用:返回RDD排序后的前n个元素组成的数组
  2. 需求:创建一个RDD,统计该RDD的条数
  3. 注意:里面默认是升序排列

aggregate案例

  1. 参数:(zeroValue:U)(seqOp:(U,T)=>U,combOp:(U,U=>U)
  2. 初始值 分区内相加,分区间相加
  3. 作用:aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后使用combine函数将每个分区的结果和初始值(zeroValue)操作。这个函数最终返回的类型将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型不一致。
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[50] at makeRDD at <console>:24
//                   (初始值)(分区内相加,分区间相加)
scala> rdd1.aggregate(0)(_+_,_+_)
res21: Int = 55
//aggregate会把分区内的初始值相加,分区间也会把初始值相加
scala> rdd1.aggregate(10)(_+_,_+_)
res22: Int = 85
scala> 

fold (num)(func)案例

  1. 作用:折叠操作,aggregate的化简操作,seqop和combop一样
  2. 需求:创建一个RDD,将所有元素相加得到结果。

countByKey()案例

  1. 作用:针对(K-V)类型的RDD,返回一个(K,Int)的map,表示一个key对应的元素个数。
  2. 需求:创建一个PairRDD,统计每种Key的个数。
scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[51] at parallelize at <console>:24

scala> rdd.countByKey
res23: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)
scala> 

spark中RDD常用转换算子

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object TestTransformation {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("TeatTransformation")
    val sc = SparkContext.getOrCreate(conf)
    //对1-10的集合每一个元素*2
//    val rdd1:RDD[Int] = sc.parallelize(seq = 1 to 10)
//    val rdd2:RDD[Int] = rdd1.map(_*2)
//    rdd2.collect().foreach(println(_))

//	val rdd1 = sc.parallelize(List("zhangsan","lisi","wangwu"))
//    val rdd2:RDD[(String, Int)] = rdd1.map(x => (x,x.length))
//    rdd2.collect().foreach(println(_))
//    sc.stop()

	//使用  .filter()过滤
    val rdd1 = sc.parallelize(List("zhangsan","lisi","wangwu"))
    val rdd2 = rdd1.filter(_.length > 7)
    sc.stop()
    
    //使用mapValuse()
    val rdd1 = sc.parallelize(List("zhangsan","lisi","wangwu"))
    val b= a.map(x=>(x.length,x))
    val c = b.mapValues("x"+_+"x")
    c.collect.foreach(println(_))
    sc.stop()
    
    //RDD常用算子的计算.distinct
    val add1 = sc.parallelize(List(1,2,3,4,5,6,7))
    rdd.distinct().collect().foreach(println(_))
    println(rdd1.distinct(numPartitions = 8).parttitions.length)
    sc.stop()
    
    val rdd1 = sc.testFile("in/a.txt")
    val rdd2 = rdd1.flatMap(_.split("")).map(_,1)
    val rdd3 = rdd2.groupByKey().map((x._1,x._2.size) => (x,y.size):)
  	
  }
}

聊Spark入门

为什么有Spark?

这就是spark与Hadoop的区别:

1)移动算子(Spark)还是移动数据(Hadoop)

2)Spark需要落盘,新建集群,分布在很多节点上(Hadoop干的事),大数据就是分布式(让更多的人处理很多的硬件。Hadoop的每个数据放在每一个节点上,只能矗立在自己的盘浮上面,在硬盘上面。速度相当慢)

3)Hadoop存储完之后需要汇报,要把数据现存起来,然后后期在排查,数据迁移的工作。本质上还是速度的原因,无法实时数据处理。处理离线型数据,不能处理及时文件。数据使用128MB开始分割(躲数据),Hadoop在做数据迁移,汇成一个完成的数据。

4)Spark不需要落盘需要手续,它大部分不要落盘,有时候也需要落盘,很少很少。数据都写在内存里面,内存容量比较小,不稳定,专做即及时性数据,也可以处理离线式数据。Spark先让数据在节点上处理(使用函数)最后把数据拉回来,Spark发布命令函数给节点取处理数据,

5)Spark现在无法取代HDFS,Spark想取代Map Reduce,通常梳理流数据,Spark在内存上处理,面试出去说2.X版本,他点就是,支持各种语言。

Spark 技术栈:

Spark 算子

今天讲Spark SQL步骤,ML机器学习,GraphX是图数据库

Spack底层是Scala开发的。

Spark初体验:

运行架构:

Spark 算子

最后结果即可,不用移动数据,全程都在自己的节点上(前面负责给函数就行了,最后返回消息即可)

DricerProgram拿到算子

Cluster Manager是中间传递的(传函数,信息,结果等等),Worker Node是执行器,执行器里面有Cashe,可以与隔壁的一个Executor调换数据(互相沟通),

猫里面可以放wok站点的,Spark的SparkContext也可以放上下文环境。

Spark Context RDD就是一个函数(分布式弹性算子),把函数分布给多个数据节点

RDD -> function falatMap  map  reduceByKey
public (返回函数或返回一个对象) Object abc(int size){
	String  sql = "select * from userinfos";
	ResultSet rs = ....                     //算子即使函数体
	RDD 是一个算子,或者数据存储结构(可以存放数据)   是分布式的
	
}

RDD相当于一个函数,函数由SparkContext.getOrCreate来点的。

Spark可以不需要启动Hadoop,一般情况下还是需要安装Hadoop的。

Spark 算子

sbin启动环境 bin在魂晶内部使用的命令

黑界面里面you默认的sc,不能重新创建了。

算子:转换算子(转化算子) 动作算子(最后溢出的时候常出现)

落盘:在数据迁移的时候会落盘。

相关标签: Spark算子