MapReduce学习笔记
程序员文章站
2022-03-16 10:57:02
...
MapReduce是一个用于处理海量数据的分布式计算框架
自身不存储数据,数据在HDFS上
解决了:
- 数据分布式存储
- 作业调度
- 容错
- 机器间通信等复杂问题
MapReduce简单数据处理流程介绍:
HDFS组件、总体采用master/slave架构:
- Client:通过与NameNode和DataNode交互访问HDFS中的文件。
- NameNode:系统"总管",负责管理HDFS目录树和相关文件元数据信息。信息以"fsimage"(元数据镜像文件),"editlog"(HDFS改动日志)两个文件形式存在本地磁盘。还负责监控DataNode状态,若DN挂掉,则移除并重新备份上面的数据。
- SecondaryNameNode:定期合并fsimage和edits,并传输给NameNode。
- DataNode:负责实际的数据存储,并将信息汇报给NameNode。以Block为基本单位(默认64M)。
MapReduce架构:
MapReduce处理单位 :split
- Client:可以利用接口查看作业运行状态。
- 2.JobTracker:负责资源监控和作业调度。
- TaskTracker:周期性的通过心跳方式将资源使用情况和作业运行状态汇报给Jobtracker,同时接受命令并执行操作。TaskTracker使用“slot”等量划分本节点的资源量。"slot"代表计算资源,一个task获取到一个slot后才有机会运行。
- Task:分为Map task和Reduce Task,由TaskTracker启动。
Map Task过程:
- split ——>决定Map Task个数
- 调用map()函数
- 分成若干个partition,每一个partition被一个Reduce Task处理。
Reduce Task过程:
- Shuffle阶段,读取中间数据
- Sort阶段,对key排序
- Reduce阶段,合并key相同的value,最终输出到HDFS上。
例子:简单wordcount
在hadoop 1.0集群中
map.py
import sys
for line in sys.stdin:
ss = line.strip().split(' ')
for word in ss:
print '\t'.join([word.strip(), '1'])
red.py
import sys
cur_word = None
sum = 0
for line in sys.stdin:
ss = line.strip().split('\t')
if len(ss) != 2:
continue
word, cnt = ss
if cur_word == None:
cur_word = word
if cur_word != word:
print '\t'.join([cur_word, str(sum)])
cur_word = word
sum = 0
sum += int(cnt)
print '\t'.join([cur_word, str(sum)])
run.sh 启动脚本
HADOOP_CMD="/usr/local/src/hadoop-1.2.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar"
INPUT_FILE_PATH="/1.data"
OUTPUT_PATH="/output"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH \
-output $OUTPUT_PATH \
-mapper "python map.py" \
-reducer "python red.py" \
-file ./map.py \
-file ./red.py
- bash run.sh
- hadoop fs -ls /output
- hadoop fs -text /output/part-00000 | head
作业结果如下,启动脚本之前需要把文件1.data上传到hdfs中:hadoop fs -put 1.data /
上一篇: TP5实现表格拖动排序并保存到数据库的方法(附代码)
下一篇: keepalived工作原理是什么