Hadoop学习(9)-spark的安装与简单使用
spark和mapreduce差不多,都是一种计算引擎,spark相对于mapreduce来说,他的区别是,mapreduce会把计算结果放
在磁盘,spark把计算结果既放在磁盘中有放在内存中,mapreduce把可能会把一个大任务分成多个stage,瓶颈发生在io,spark有一个叫dag(有向无环图)的东西,可以把多个算子都放在一个stage进行合并。
spark shuffle的时候一定会把数据放在磁盘中,因为如果在shuffle的时候数据丢失,代价特别的昂贵
spark和mapreduce最大的区别是,spark可以把中间结果即放在内存,又可以放在磁盘中。
因为现在内存和以前比已经很大了,能放在内存就放在内存里。但mapreduce是把中间结果放在磁盘中的
,磁盘的io速度太慢了,所以spark比mapreduce快很多了
spark仅仅可以替代的是mapreduce。spark有个很重要的东西是dag(有向无环图)
可以将多个相同的算子合并到一个stage里面(以后学。。)
spark在shuffle的时候一定是在内存中的,但为了保证数据的安全性,也会把数据写入磁盘,不然恢复的时候代价比较大
spark可以运行在hadoop,yarn,mesos(apache的一个资源调度框架),standalone(spark自带的)。。。
spark主节点master,工作节点worker,master可以部署两个,解决单点故障,这时候可以引入zookeeper
安装
首先我们的电脑上要有jdk8以上的版本
还有hadoop的hdfs就行
我们机器的规划可以是两个master,一个zookeeper
我们现在官网上下载一个spark,然后传到你的机器集群中,解压安装到指定目录
然后进入到conf目录
然后修改两个配置文件
vim spark-env.sh
里面含义,指定你的jdk路径,指定你的zookeeper机器都有哪台,指定每台机器的内核数(线程数量),每台机器的使用内存
这里面不用安装scala了,因为spark里面已经安装好了
:r! echo /root/spark //小知识在vim里面输出
linux脚本
一个简单的for循环
for i in {5..8}; do echo $i; done
scp的时候如果机器太多可以写个脚本
for i in {1..5}; do scp -r /root/apps/spark/ hdp-0$i:/root/apps/; done
然后修改slaves文件,把你的worker机器给写入进去就行了
然后把你的spark 给拷贝到其他机器上
然后启动的话sbin下有一个start-all.sh
sbin/start-all.sh 启动即可,这个最好首先配置好master上对其他机器的免密登录
master上的网页显示端口是8080,内部通信端口是7077
现在可以试着配置spark的高可用
修改spark-env.sh
export java_home=/root/apps/jdk1.8.0_141/
#export spark_master_host=hdp-01
#export spark_master_port=7077export spark_daemon_java_opts="-dspark.deploy.recoverymode=zookeeper -dspark.deploy.zookeeper.url=hdp-01:2181,hdp-02:2181,hdp-03:2181,hdp-04:2181 -dspark.deploy.zookeeper.dir=/root/zk_spark"
export spark_worker_cores=8
export spark_worker_memory=6g
修改后启动你的zookeeper
县启动你的集群
sbin/start-all.sh
然后再一台有zookeeper的机器上在启动一个master
sbin/start-master.sh
提交第一个spark程序
提交一个程序需要一个客户端 人家名字叫sparksubmit(driver)
给活跃的master提交任务,
随便找一台机器当做客户端,在安装包里有一个example
[root@hdp-02 spark-2.4.3-bin-hadoop2.7]# bin/spark-submit --master spark://hdp-01:7077 --class org.apache.spark.examples.sparkpi examples/jars/spark-examples_2.11-2.4.3.jar 500
这里只是用了人家的样例,最后数字是采多少样本
在执行任务的时候,master会启动sparksubmit,还有个coarsergrainedexecutorbackend
这个东西是真正执行任务的东西
程序执行完,这几个进程就会被释放。
可以指定参数,可以指定多个master,比如用多大内存,内核数
参数说明:
--master spark://node1.edu360.cn:7077 指定master的地址
--executor-memory 2g 指定每个worker可用内存为2g
--total-executor-cores 2 指定整个集群使用的cup核数为2个
[root@hdp-02 spark-2.4.3-bin-hadoop2.7]# bin/spark-submit --master spark://hdp-01:7077,hdp-02:7077 --class org.apache.spark.examples.sparkpi --executor-memory 2048mb --total-executor-cores 12 examples/jars/spark-examples_2.11-2.4.3.jar 500
spark shell交互式命令行,方便学习和测试,可以写spark程序,也是一个客户端
启动spark shell
bin/spark-shell,这样启动没有启动master,是local模式运行的,是模拟的
bin/spark-shell --master spark://hdp-01:7077
此时执行spark-shell的机器也会执行subsubmit
workder也会启动coarsergrainedexecutorbackend,即使现在还没有提交任务,他们在准备工作
在spark-shell中提交一个wordcount程序.....scale一句搞定
sc.textfile 指定读取哪里的文件,注意要是所有的机器都能访问的,不能只是本地的,最好是hdfs的
sc.textfile("hdfs://hdp-01:9000/wordcount/input").flatmap(_.split(" ")).map((_, 1)).reducebykey(_+_).collect
sc是spark core(rdd)的执行入口
spark都有哪些进程
首先启动master,然后启动worker,worker会向master注册,然后不停发送心跳检测
master,如果只有一个,会把这些信息保存到磁盘,如果有多个,会保存在zookeeper
1.客户端通过命令行参数知道master在哪,并设置一些参数,比如内存资源和内核数,会先添加任务,然后向master申请资源
2.然后master就会在worker里面查找,负责资源调度(就是将executor在哪些worker启动)
3.master和work进行rpc通信,让worker启动executor(将分配的参数传递过去)然后worker启动executor。
4.接下来,executor和客户端sparksubmit进行通信(通过master-》worker-》execotor这样知道客户端在哪)。
5.然后把sparksubmit把真正的计算逻辑生成task发送给executor。
6.然后再executor执行真正的计算逻辑
yarn和spark的standalone调度模式对比
resourcemanager master 管理子节点,资源调度,接受任务请求
nodemanager worker 管理当前结点,并管理子进程
yarnchild executor 运行真正的计算逻辑的
client sparksubmit (client+applicationmaster)提交任务,管理该任务的executor,并将task提交给集群
applicationmaster
用scala语言简单写一个wordcount
import org.apache.spark.rdd.rdd import org.apache.spark.{sparkconf, sparkcontext} / object wordcount { def main(args: array[string]): unit = { val conf = new sparkconf() //spark程序的入口 val sc = new sparkcontext(conf) //得到在哪开始读取数据 val lines: rdd[string] = sc.textfile(args(0)) //切分压平 val words: rdd[string] = lines.flatmap(_.split(" ")) //把每个单词都变成(单词,1)的元组 val wordandone: rdd[(string,int)] = words.map((_,1)) //然后进行聚合 val reduced: rdd[(string, int)] = wordandone.reducebykey(_+_) //排序 val sorts = reduced.sortby(_._2, false) //保存 sorts.saveastextfile(args(1)) sc.stop() } }
然后把这个程序打包放到集群中
./spark-submit --master spark://hdp-01:7077 --class test.scalawordcount /root/test.jar hdfs://hdp-01:9000/wc hdfs://hdp-01:9000/wcout
上一篇: 让我爸爸也送给你一颗种子
推荐阅读
-
ubuntu中snap包的安装、更新删除与简单使用
-
Hadoop学习(7)-hive的安装和命令行使用和java操作
-
archLinux 学习笔记--mlocate的安装与使用
-
Hadoop学习(9)-spark的安装与简单使用
-
CentOS 7.0 使用 yum 安装 MariaDB 与 MariaDB 的简单配置
-
eclemma怎么安装 eclemma的安装与简单使用图文教程(附下载)
-
Angular4学习之Angular CLI的安装与使用教程
-
ros机器人学习 --- ORB_SLAM的安装与使用
-
Hadoop学习(7)-hive的安装和命令行使用和java操作
-
Nodejs的学习Ⅳ(静态与动态服务器的简单搭建、npm的上传包、在公网上部署服务器、MySQL的安装)