MapReduce经典案例--数据去重
程序员文章站
2024-03-19 17:03:52
...
项目完整代码以及两个test文件以及maven环境所需jar包:
链接:https://pan.baidu.com/s/10MEBxCfy_ShTaCf_RhGzMw
提取码:1314
- 首先要在windows上搭建开发环境 见https://blog.csdn.net/weixin_42693712/article/details/108796708
- 打开本机eclipse,创建一个maven工程
- 选择file->new->maven project创建maven工程,选择create a simple project选项,点击next GroupId写com.itcast,Artifact写HadoopDemo,其他默认就好
- 编辑pom.xml添加依赖,依赖添加成功后,在maven Dependenciens下面自动会有jar包
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.itcast</groupId>
<artifactId>HadoopDemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
</dependencies>
</project>
- 在src下建立com.itcast.hdfsdemo包,建立HDFS_CRUD类以初始化客户端对象
package com.itcast.hdfsdemo;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Comparator;
import java.util.Locale;
import javax.tools.JavaFileManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.junit.Before;
import org.junit.Test;
import com.sun.javadoc.DocErrorReporter;
import com.sun.javadoc.ProgramElementDoc;
import com.sun.javadoc.SourcePosition;
import com.sun.tools.doclets.internal.toolkit.Content;
import com.sun.tools.doclets.internal.toolkit.WriterFactory;
import com.sun.tools.doclets.internal.toolkit.util.MessageRetriever;
public class HDFS_CRUD {
FileSystem fs = null;
@Before
public void init() throws Exception{
//要访问hdfs的url
Configuration conf = new Configuration();
//执行hdfs
conf.set("fs.defaultFS","hdfs://gaoyu:9000");
//进行客户端设置
System.setProperty("HADOOP_USER_NAME", "root");
//静态方法获取客户端对象
fs = FileSystem.get(conf);
}
@Test
public void testAddFileToHdfs() throws IOException{
//将上传的文件保存在本地路劲
Path src = new Path("D:/test.txt");
//要上川岛hdfs目标路劲
Path dst = new Path("/testFile");
//上传文件方法
fs.copyFromLocalFile(src, dst);
//关闭资源
fs.close();
}
@Test
public void testDownloadFileTolocal() throws IllegalMonitorStateException,IOException{
//下载文件
fs.copyToLocalFile(new Path("/testFile"), new Path("D://"));
fs.close();
}
//创建删除重命名文件
@Test
public void testMkdirAndDeleteAndRename() throws Exception{
//创建目录
fs.mkdirs(new Path("/a/b/c"));
fs.mkdirs(new Path("/a2/b2/c2"));
//重命名文件或文件夹
fs.rename(new Path("/a"), new Path("/a3"));
//删除文件夹
fs.delete(new Path("/a2"),true);
}
@Test
public void testListFiles() throws FileNotFoundException,IllegalArgumentException,IOException{
//获取迭代器对象
RemoteIterator<LocatedFileStatus>listFiles = fs.listFiles(new Path("/"), true);
while(listFiles.hasNext()) {
LocatedFileStatus fileStatus = listFiles.next();
//打印当前文件名
System.out.println(fileStatus.getPath().getName());
//打印当前文件块大小
System.out.println(fileStatus.getBlockSize());
//打印当前文件权限
System.out.println(fileStatus.getPermission());
//打印当前文件内容长度
System.out.println(fileStatus.getLen());
//获取文件块信息
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
for(BlockLocation b1 : blockLocations) {
System.out.println("block-length:"+b1.getLength()+"--"+"block-offset:"+b1.getOffset());
String[] hosts = b1.getHosts();
for(String host : hosts) {
System.out.println(host);
}
}
System.out.println("-------------分割线--------------------");
}
}
}
注意:
gaoyu:9000s是自己设置的在core-site.xml中,自行查看
其中output在本地目录下不用创建,会自动生成,input目录需要自行创建在相应路劲下,我的是在D:\Dedup\input目录下,和代码中位置需要一直一致
- 创建cn.itcast.mr.dedup包,在该路径下编写自定义Mapper类DedupMapper,自定义Reducer类DedupReducer以及运行主类DedupDriver
DedupMapper 类
package cn.itcast.mr.dedup;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DedupMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
private static Text field = new Text();
// <0,2018-3-3 c><11,2018-3-4 d>
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
field = value;
context.write(field, NullWritable.get());
}
// <2018-3-3 c,null> <2018-3-4 d,null>
}
DedupReducer类
package cn.itcast.mr.dedup;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class DedupReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
// <2018-3-3 c,null> <2018-3-4 d,null><2018-3-4 d,null>
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
DedupRunner类
package cn.itcast.mr.dedup;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DedupRunner {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(DedupRunner.class);
job.setMapperClass(DedupMapper.class);
job.setReducerClass(DedupReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("D:\\Dedup\\input"));
// 指定处理完成之后的结果所保存的位置
FileOutputFormat.setOutputPath(job, new Path("D:\\Dedup\\output"));
job.waitForCompletion(true);
}
}
- 代码编写完毕后,打开虚拟机开启所有服务进程
start-all.sh
- 然后运行主类DedupRunner,在output目录下查看结果
查看文件part-r-00000发现已经成功