Spark入门篇——安装
目录
Spark
简介
Apache Spark是一种快速、通用、可扩展的大数据分析引擎。 它提供了 Java, Scala, Python 和 R 的高级 API,以及一个支持通用的执行图计算的优化过的引擎. 它还支持一组丰富的高级工具, 包括使用 SQL 处理结构化数据处理的 Spark SQL, 用于机器学习的 MLlib, 用于图计算的 GraphX, 以及 Spark Streaming,Spark是基于内存计算的大数据并行计算框架。
特点
- 快速:比Hadoop中的MapReduce快100倍。(相对于Hadoop3.0之前来说的,据说Hadoop3.0的MapReduce要赶超Spark)
- 易用:Spark支持Java,Python,Scala和R的API
- 通用:Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。
- 兼容性:Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等
集群搭建
环境说明
ip | hostname |
192.168.8.11 | cma1 |
192.168.8.12 | cma2 |
192.168.8.13 | cma3 |
做好免密登录且要关闭防火墙:
systemctl stop firewalld.service #停止firewall
systemctl disable firewalld.service #禁止firewall开机启动
firewall-cmd --state #查看防火墙状态
软件说明
软件 | 版本 |
Linux | Centos7 |
Java | 1.8 |
Scala | 2.11.8 |
Spark | 2.4.4 |
由于篇幅原因除Spark以外的其他环境搭建暂不列出。
安装
- 从https://www.apache.org/dyn/closer.lua/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz下载Spark安装包。
- 上传至服务器cma1.
- 解压Spark包到指定目
tar -zxvf spark-2.4.4-bin-hadoop2.7.tgz -C apps/
- 进入conf目录下后修改相应文件名
mv spark-env.sh.template spark-env.sh mv slaves.template slaves
- 修改spark-env.sh的内容
#Java家目录 export JAVA_HOME=/usr/jdk1.8.0_141 #master主机名 export SPARK_MASTER_HOST=cma1 #master端口 export SPARK_MASTER_PORT=7077
- 修改slaves向其中添加所有从节点的主机名
cma2 cma3
-
添加环境变量
vim /etc/profile
启停
单独启动master:
./sbin/start-master.sh
启动所有worker:
./sbin/start-slaves.sh
单独停止master:
./sbin/stop-master.sh
停止所有的worker:
./sbin/stop-slaves.sh
访问 主机名:8080 可以进入SparkUI:
常用术语
Application
Application是指用户编写的应用程序,其中包括一个Driver功能的代码和分布在集群中多个节点上运行的Executor代码。
Driver
Spark中的Driver即运行Application中的main函数并创建SparkContext,其目的是为了准备Spark应用程序的运行环境,在Spark中由SparkContext负责与ClusterManager通信,进行资源申请,任务的分配和监控等,当Executor部分执行完毕后,Driver同时负责将SparkContext关闭,通常用SparkContext代表Driver。
Executor
某个Application运行在worker节点上的一个进程,该进程负责运行某些Task,并且将数据存到内存或磁盘上,每个Applicarion都有格则独立的一批Executor。在Spark on Yarn模式下,其进程名为CoardeGrinedExecutorBackend。一个CoardeGrinedExecutorBackend有且仅有一个Executor对象,负责将Task包装成TaskRunner并从线程池中取出一个空闲的线程来运行task。其并行运行的task数量取决于分配给他的cpu个数。
Cluster Manager
指的是在集群上获取资源的外部服务。目前有三种类型:
- Standalon:Spark原生的资源管理,有Master负责资源的分配
- Mesos:与Hadoop MR兼容性良好的一种自语那调度框架
- Yarn:主要是指Hadoop Yarn中的ResourceManager
Worker
建中任何可以运行Application代码的节点,在Standalone模式中指的是通过slave文件配置的Worker节点,在Spark on Yarn模式下就是NodeManager节点。
Task
被发送到某个Executor上的工作单元,和Hadoop MR中的MapTask、ReduceTask概念一样,是运行Application的基本单位,多个Task组成一个Stage,而Task的调度和管理等是由TaskScheduler负责的。
Job
包含多个Task组成的并行计算,往往由Spark action算子触发生成,一个Application中往往会产生多个job。
Stage
每个Job会被Stage,每个Stage可被分为多个Task,作为一个TaskSet,Stage的划分和调度是有DAGSchedulr来负责的,Stage分为ShuffleMapStage和ResultStage,Stage的边界就是发生Shuffle的地方。
DAGScheduler
根据Job构建基于Stage的DAG(Directed Acyclic Graph有向无环图),并且提交Stage给TASKScheduler。其划分Stage的依据就是RDD之间的依赖关系找出开销最小的调度方法。
TaskScheduler
将TaskSet提交给Worker运行,每个Executor运行什么Task就在此处分配的。TaskScheduler维护所有TaskSet,当Executor向Driver发送心跳时,TaskScheduler会根据资源剩余情况分配相应的Task。另外TaskScheduler还维护着所有的Task的运行标签,重试失败的Task。
在不同运行模式中任务调度器具体为:
-
Spark on Standalone模式为TaskScheduler
-
YARN-Client模式为YarnClientClusterScheduler
-
YARN-Cluster模式为YarnClusterScheduler
WordCount
Spark的Hello World,理解了这个就掌握了数据处理的精华。。。。。。
def main(args: Array[String]): Unit = {
//获取session,Spark2.0之后出现的
val session = SparkSession.builder()
//指定master为localhost。注意:在向集群提交任务是这行需要注释掉
.master("local[8]")
//指定Application的名字
.appName("wordCount")
.getOrCreate()
//获取SparkContext
val spark = session.sparkContext
//制作一个数据集
val datas = Seq("java,python","java,scala","c#,c++,c++","python,java","scala,scala")
//制作rdd
val dataRdd: RDD[String] = spark.makeRDD(datas,4)
//进入wordCount
//先map后压平
dataRdd.flatMap(_.split(","))
//对每一条数据拼装成元组(str,1)
.map((_,1))
//通过key对value进行操作,reduceByKey左面的参数代表累加和,右面参数代表当前元组的value
.reduceByKey(_+_)
//遍历结果并打印
.foreach(println)
}
上一篇: RocketMQ入门篇-安装
下一篇: RocketMQ入门篇-安装