hadoop mr实现单表列转行--mr system.out数据位置
程序员文章站
2022-04-28 15:44:03
...
1 代码和业务:
package mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
a b 1
a b 2
a b 3
c d 4
c d 5
c d 6
期待变成
a b 1,2,3
c d 4,5,6
* @author zm
*
*/
public class ConcatWSMapReduce {
public static class ConcatWSMapper extends Mapper<LongWritable, Text, ConcatWS, Text>{
/**
* 每一行执行一次map函数
* @param key 表示字节在源文件中偏移量
* @param value 行文本内容
*/
protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
final String[] splited = value.toString().split("\t");
System.out.println("splited length is: " + splited.length);
String col1 = splited[0];
String col2 = splited[1];
String col3 = splited[2];
System.out.println("col1: " + col1 + "col2: " + col2 + "col3: " + col3);
context.write(new ConcatWS(col1,col2), new Text(col3));
};
}
//分组:<hello,{1,1}><me,{1}><you,{1}>【把相同key的value放到一起】 reduce方法是每一组调用一次 左侧结果 为3组 则调用3次reduce方法
public static class ConcatWSReducer extends Reducer<ConcatWS, Text, Text, Text>{
/**
* 每个组调用一次reduce函数
* @param word 表示单词
* @param times 表示相同key的value的迭代器
*/
protected void reduce(ConcatWS ws, Iterable<Text> v2s, org.apache.hadoop.mapreduce.Reducer<ConcatWS,Text,Text,Text>.Context context) throws java.io.IOException ,InterruptedException {
StringBuilder sb = new StringBuilder("");
for (Text col3 : v2s) {
sb.append(col3.toString()).append(",");
}
System.out.println("reduce key content: " + ws.toString());
System.out.println("reduce val content: " + sb.toString());
context.write(new Text(ws.toString()), new Text(sb.toString()));
};
}
public static void main(String[] args) throws Exception {
// 设置Job对象
final Configuration conf = new Configuration();
final Job job = new Job(conf);
job.setJobName(ConcatWSMapReduce.class.getSimpleName());
job.setJarByClass(ConcatWSMapReduce.class);
// 给Job对象设置自定义 mapper reducer
job.setMapperClass(ConcatWSMapper.class);
job.setReducerClass(ConcatWSReducer.class);
// 设置map reduce输出参数类型
job.setMapOutputKeyClass(ConcatWS.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 设置Job任务要处理的数据源和输出数据目的地
FileInputFormat.addInputPaths(job, "/zmdata/zm.txt"); // 注意是addInputPaths 用的是复数的方法
Path outputpath = new Path("/zmdata/zmout");
FileSystem fs = FileSystem.get(new URI("/"), conf);
if(fs.exists(outputpath)){
fs.delete(outputpath, true);
}
FileOutputFormat.setOutputPath(job, outputpath);
// 执行Job
job.waitForCompletion(true);
}
// 注意写成内部public 类 否则执行mr时 会报不识别ConcatWS.source
public static class ConcatWS implements WritableComparable<ConcatWS>{
private String col1 = "";
private String col2 = "";
public ConcatWS(){}
public ConcatWS(String col1, String col2){
this.col1 = col1;
this.col2 = col2;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(col1);
out.writeUTF(col2);
}
@Override
public void readFields(DataInput in) throws IOException {
this.col1 = in.readUTF();
this.col2 = in.readUTF();
}
@Override
public int compareTo(ConcatWS ws) {
int result = 0;
result = this.col1.compareTo(ws.col1);
if(result == 0){
result = this.col2.compareTo(ws.col2);
}
return result;
}
@Override
public String toString() {
return col1 + "\t" + col2 ;
}
}
}
2 本机使用ant脚本提交后执行,ant脚本如下:
<?xml version="1.0" encoding="UTF-8"?>
<!-- 该文件与src文件夹、lib文件夹同一级 -->
<project name="hadoop2测试项目" basedir="." default="sshexec">
<!--属性设置-->
<property environment="env" />
<property file="build.properties" />
<property name="src.dir" value="${basedir}/src" />
<property name="java.lib.dir" value="${env.JAVA_HOME}/lib" />
<property name="classes.dir" value="${basedir}/classes" />
<property name="dist.dir" value="${basedir}/dist" />
<property name="project.lib.dir" value="${basedir}/lib" />
<property name="localpath.dir" value="${basedir}" />
<property name="remote.home" value="~"/>
<!--可以修改:hadoop集群的hostname或者ip-->
<property name="remote.hostname" value="hadoop3"/>
<!--可以修改:登录hadoop集群所在linux的用户名-->
<property name="remote.username" value="root"/>
<!--可以修改:登录hadoop集群所在liniux的密码-->
<property name="remote.password" value="123456"/>
<!--可以修改:每次需要运行的main类,写到这里。运行时拼接为hadoop jar xxx.jar MainClass -->
<property name="main.class" value="mapreduce.ConcatWSMapReduce"/>
<!--可以修改:hadoop集群在linux的部署路径-->
<property name="hadoop.path" value="/opt/hadoop-2.5.2"/>
<!-- 基本编译路径设置 -->
<path id="compile.classpath">
<fileset dir="${java.lib.dir}">
<include name="tools.jar" />
</fileset>
<fileset dir="${project.lib.dir}">
<include name="*.jar" />
</fileset>
</path>
<!-- 运行路径设置 -->
<path id="run.classpath">
<path refid="compile.classpath" />
<pathelement location="${classes.dir}" />
</path>
<!-- 清理,删除临时目录 -->
<target name="clean" description="清理,删除临时目录">
<!--delete dir="${build.dir}" /-->
<delete dir="${dist.dir}" />
<delete dir="${classes.dir}" />
<echo level="info">清理完毕</echo>
</target>
<!-- 初始化,建立目录,复制文件 -->
<target name="init" depends="clean" description="初始化,建立目录,复制文件">
<mkdir dir="${classes.dir}" />
<mkdir dir="${dist.dir}" />
</target>
<!-- 编译源文件-->
<target name="compile" depends="init" description="编译源文件">
<javac srcdir="${src.dir}" destdir="${classes.dir}" source="1.7" target="1.7" includeAntRuntime="false">
<classpath refid="compile.classpath" />
<compilerarg line="-encoding UTF-8 "/>
</javac>
</target>
<!-- 打包类文件 -->
<target name="jar" depends="compile" description="打包类文件">
<jar jarfile="${dist.dir}/jar.jar">
<fileset dir="${classes.dir}" includes="**/*.*" />
</jar>
</target>
<!--上传到服务器
**需要把lib目录下的jsch-0.1.51拷贝到$ANT_HOME/lib下,如果是Eclipse下的Ant环境必须在Window->Preferences->Ant->Runtime->Classpath中加入jsch-0.1.51。
-->
<target name="ssh" depends="jar">
<scp file="${dist.dir}/jar.jar" todir="${remote.username}@${remote.hostname}:${remote.home}" password="${remote.password}" trust="true"/>
</target>
<target name="sshexec" depends="ssh">
<sshexec host="${remote.hostname}" username="${remote.username}" password="${remote.password}" trust="true" command="${hadoop.path}/bin/hadoop jar ${remote.home}/jar.jar ${main.class}"/>
</target>
</project>
3 执行的mr代码中写上了 sysotem.out 那么这些数据在哪里呢? 比如上面的输出:
上图所示,找到你执行mr任务的编号,进去后看如下图:
上一篇: php smarty的预保留变量总结
下一篇: php 移除数组重复元素的一点说明