MapReduce入门
在我们开始编写MapReduce程序之前,我们需要设置和配置开发环境。
1、配置文件:
-
configuration1.xml
<?xml version="1.0"?>
<configuration>
<property>
<name>color</name>
<value>yellow</value>
<description>Color</description>
</property>
<property>
<name>size</name>
<value>10</value>
<description>Size</description>
</property>
<property>
<name>weight</name>
<value>heavy</value>
<final>true</final>
<!-- 注意:被标记为final的定义,不会在后面被覆盖,比如:编写一个configuration2.xml ,通过conf一次加载,#<property>
#<name>size</name> 会被覆盖
#<value>12</value>
#</property>
#<property>
#<name>weight</name> 不会被覆盖
#<value>light</value>
#</property> -->
<description>Weight</description>
</property>
<property>
<name>size-weight</name>
<value>${size},${weight}</value>
<description>Size and weight</description>
</property>
</configuration>
Configuration conf = new Configuration();
conf.addResource("configuration1.xml");
conf.addResource("configuration2.xml");
assertThat(conf.get("color"), is("yellow"));
assertThat(conf.getInt("size", 0), is(10));
assertThat(conf.get("breadth", "wide"), is("wide"));
2、开发环境配置(第一步):构建和测试MapReduce程序所需的依赖关系--maven(pom.xml):
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>com.hadoopbook</groupId>
<artifactId>hadoop-book-mr-dev</artifactId>
<version>4.0</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>2.5.1</hadoop.version>
</properties>
<dependencies>
<!-- Hadoop main client artifact -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- Unit test artifacts -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- for writing MapReduce tests -->
<dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
<version>1.1.0</version>
<classifier>hadoop2</classifier>
<scope>test</scope>
</dependency>
<!-- Hadoop test artifact for running mini clusters -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>hadoop-examples</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.5</version>
<configuration>
<outputDirectory>${basedir}</outputDirectory>
</configuration>
</plugin>
</plugins>
</build>
</project>
3、 在开发Hadoop应用程序时,通常在本地运行应用程序和在集群上运行应用程序之间切换。事实上,您可能有几个与您一起工作的集群,或者您可能有一个您喜欢测试的本地“伪分布”集群, 适应这些变化的一种方法是拥有Hadoop configuration文件,其中包含您所运行的每个集群的连接设置,并指定在运行Hadoop应用程序或工具时正在使用哪个集群。作为最佳实践,建议将这些文件保存在Hadoop的安装目录树之外,因为这样可以很容易地在Hadoop版本之间切换,而不需要复制或丢失设置。使用某个配置文件:hadoop fs -conf conf/hadoop-localhost.xml -ls 。 如果您省略了-conf选项,那么您将在 $HADOOP_HOME下的etc/Hadoop子目录中获取Hadoop配置。或者,如果设置了HADOOP_CONF_DIR,那么将从该位置读取Hadoop配置文件。
-
hadoop-local.xml(名字自定义),包含默认文件系统的默认Hadoop配置,以及用于运行MapReduce作业的本地(in-JVM)框架:
<?xml version="1.0"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>file:///</value>
</property>
<property>
<name>mapreduce.framework.name</name>
<value>local</value>
</property>
</configuration>
-
hadoop-localhost.xml, 指向namenode和yarn资源管理器,它们都在本地主机上运行:
<?xml version="1.0"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost/</value>
</property>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>yarn.resourcemanager.address</name>
<value>localhost:8032</value>
</property>
</configuration>
-
hadoop-cluster.xml, 包含集群的namenode和yarn资源管理器地址的详细信息:
<?xml version="1.0"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://namenode/</value>
</property>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>yarn.resourcemanager.address</name>
<value>resourcemanager:8032</value>
</property>
</configuration>
-
Hadoop附带了一些辅助类,使其更容易从命令行运行作业:
4、对MR的单元测试编写:
-
针对map的测试
1、测试类
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.*;
public class MaxTemperatureMapperTest {
@Test
public void processesValidRecord() throws IOException, InterruptedException {
Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" +
// Year ^^^^
"99999V0203201N00261220001CN9999999N9-00111+99999999999");
// Temperature ^^^^^
new MapDriver<LongWritable, Text, Text, IntWritable>()
.withMapper(new MaxTemperatureMapper())
.withInput(new LongWritable(0), value)
.withOutput(new Text("1950"), new IntWritable(-11))
.runTest();
}
}
2、map类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MaxTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private NcdcRecordParser parser = new NcdcRecordParser();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
parser.parse(value);
if (parser.isValidTemperature()) {
context.write(new Text(parser.getYear()),
new IntWritable(parser.getAirTemperature()));
}
}
}
5、在集群上运行
-
将文件打包成jar,并放到集群上。
-
通过hadoop jar <*****.jar>客户端类路径。
-
用户的任务路径可以由classpath指定,而集群的map和reduce不能被其控制。
-
包依赖
-
优先级
-
例子:% unset HADOOP_CLASSPATH
-
% hadoop jar hadoop-examples.jar v2.MaxTemperatureDriver -conf conf/hadoop-cluster.xml input/ncdc/all max-temp
-
解释:yarn ID号有yarn提供,格式为:时间(1410450250506)和递增的计数器(后者达到 10000后不会重置,而是增加位数)。map job ID 通过修改yarn ID 的前缀;作业由许多任务组成,其ID由修改作业的前缀和增加后缀组成。
6、管理页面:http:// resource-manager-host :8088/ :包括作业的进度、统计、日志等
-
作业被以json格式存储在hdfs中,一周后删除。
-
主表显示了运行或正在集群上运行的所有应用程序。有一个搜索框,用于过滤应用程序以找到您感兴趣的应用程序。主视图可以显示每个页面的100个条目,并且资源管理器一次可以在内存中保存多达10000个已完成的应用程序( set by yarn.resourcemanager.max-completed-applications )。还要注意的是,作业历史是持久性的,所以您可以从资源管理器的以前的运行中找到工作。
-
通过Tracking UI监控job的运行情况。
-
检索结果:运行结果存储在 part-r-00000 to part-r-00029的而文件中(每一个reduce有一个输出文件,输出文件包含30个part)。
-
例子:% hadoop fs -getmerge max-temp max-temp-local
-
% sort max-temp-local | tail
-
% hadoop fs -cat max-temp/*
-
寻找错误:可以通过打印提示
7、日志:http:// node-manager-host :8042/logs/userlogs
-
设置 yarn.log-aggregation-enable to true,启动该服务
-
UI 和mapred job -logs 查看日志。
-
通过设置mapreduce.map.log.level or mapreduce.reduce.log.level来记录不同级别的日志信息。
-
yarn.nodemanager.log.retain-seconds、mapreduce.task.userlog.limit.kb设置日志的存储时间和大小,默认为不限制大小3小时后删除
-
常见错误:JVM的内存错误。 解决办法:设置mapred.child.java.opts to include -XX:-HeapDumpOnOutOfMemoryError -
-
XX:HeapDumpPath=/path/to/dumps,产生一个堆转储,可以用jhat或Eclipse Memory Analyzer等工具对其进行检查。请注意,应该将JVM选项添加到mapred.child.java.opts现有内存设置中。
-
对于一个有错误的任务,通常采用保存中间文件的方法。通过设置 mapreduce.task.files.preserve.failedtasks to true。当然也可以保存成功任务,通过设置mapreduce.task.files.preserve.filepattern to a regular expression that matches the IDs of the tasks whose files you want to keep。
-
另一个调试的方法是设置 yarn.nodemanager.delete.debug-delay-sec, 等待删除本地化任务尝试文件的秒数,增大时间,可以有足够的时间查看这些文件,然后再删除它们。
8、调优
-
通过分析任务,最好的是放到本地,用足够多的数据来查看map和reduce的性能。注意如果是大量的I/O,那么针对CPU的调优一边是没有效果的。同时放在本地和集群上跑会有一定差别,比如集群的调用等等。
-
HPROF分析器:通过设置 mapreduce.task.profile to true(我也没看懂)
9、MapReduce Workflows
将数据处理模型转化为MapReduce模型,如果比较复杂,考虑的应该是增加job,而不是将map和reduce变复杂。如果超级复杂,就交给Pig, Hive, Cascading, Crunch, or Spark。
-
mapper通常执行输入格式解析、投影(选择相关字段)和过滤(删除不感兴趣的记录)。可以运行一系列mapper和一个reduce在一个单独的MapReduce job中。
-
当有多个job在MapReduce workflow中时,最简但的方法是线性执行--JobClient.runJob(conf1);JobClient.runJob(conf2);任务出错,runjob会抛出IOException异常。
-
org.apache.hadoop.mapreduce.jobcontrol 运用在复杂的work上。
-
oozie(我之前的文章有介绍哦。):由两部分组成, 一个workflow,它存储和运行由不同类型的Hadoop作业(MapReduce、Pig、Hive等)组成的工作流,以及一个基于预定义的时间表和数据可用性运行工作流作业的coordinator。
9、oozie workflow
<workflow-app xmlns="uri:oozie:workflow:0.1" name="max-temp-workflow">
<start to="max-temp-mr"/>
<action name="max-temp-mr">
<map-reduce>
<job-tracker>${resourceManager}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/user/${wf:user()}/output"/>
</prepare>
<configuration>
<property>
<name>mapred.mapper.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.reducer.new-api</name>
<value>true</value>
</property>
<property>
<name>mapreduce.job.map.class</name>
<value>MaxTemperatureMapper</value>
</property>
<property>
<name>mapreduce.job.combine.class</name>
<value>MaxTemperatureReducer</value>
</property>
<property>
<name>mapreduce.job.reduce.class</name>
<value>MaxTemperatureReducer</value>
</property>
<property>
<name>mapreduce.job.output.key.class</name>
<value>org.apache.hadoop.io.Text</value>
</property>
<property>
<name>mapreduce.job.output.value.class</name>
<value>org.apache.hadoop.io.IntWritable</value>
</property>
<property>
<name>mapreduce.input.fileinputformat.inputdir</name>
<value>/user/${wf:user()}/input/ncdc/micro</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.outputdir</name>
<value>/user/${wf:user()}/output</value>
</property>
</configuration>
</map-reduce>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>MapReduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
</message>
</kill>
<end name="end"/>
</workflow-app>
目录结构:
运行方法:
1、先确定使用哪台oozie服务器(export OOZIE_URL="http://localhost:11000/oozie")
2、oozie job -config ch06-mr-dev/src/main/resources/max-temp-workflow.properties \
-run
3、其中max-temp-workflow.properties包含了:
nameNode=hdfs://localhost:8020
resourceManager=localhost:8032
oozie.wf.application.path=${nameNode}/user/${user.name}/max-temp-workflow
4、查看job信息:oozie job -info 0000001-140911033236814-oozie-oozi-W,在UI(http://localhost:11000/oozie)也可以查看
上一篇: Java不可变类机制浅析
下一篇: Java面试题之基本语法(图解)