欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MapReduce经典案例--数据去重

程序员文章站 2024-03-19 17:03:52
...

项目完整代码以及两个test文件以及maven环境所需jar包:
链接:https://pan.baidu.com/s/10MEBxCfy_ShTaCf_RhGzMw
提取码:1314
MapReduce经典案例--数据去重

  1. 首先要在windows上搭建开发环境 见https://blog.csdn.net/weixin_42693712/article/details/108796708
  2. 打开本机eclipse,创建一个maven工程
  • 选择file->new->maven project创建maven工程,选择create a simple project选项,点击next GroupId写com.itcast,Artifact写HadoopDemo,其他默认就好
  • 编辑pom.xml添加依赖,依赖添加成功后,在maven Dependenciens下面自动会有jar包
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.itcast</groupId>
	<artifactId>HadoopDemo</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<dependencies>

		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
			<version>2.7.4</version>
		</dependency>

		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-hdfs</artifactId>
			<version>2.7.4</version>
		</dependency>

		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>2.7.4</version>
		</dependency>

		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>RELEASE</version>
		</dependency>

		<dependency>
			<groupId>jdk.tools</groupId>
			<artifactId>jdk.tools</artifactId>
			<version>1.8</version>
			<scope>system</scope>
			<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
		</dependency>

	</dependencies>
</project>
  • 在src下建立com.itcast.hdfsdemo包,建立HDFS_CRUD类以初始化客户端对象
package com.itcast.hdfsdemo;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Comparator;
import java.util.Locale;

import javax.tools.JavaFileManager;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.junit.Before;
import org.junit.Test;
import com.sun.javadoc.DocErrorReporter;
import com.sun.javadoc.ProgramElementDoc;
import com.sun.javadoc.SourcePosition;
import com.sun.tools.doclets.internal.toolkit.Content;
import com.sun.tools.doclets.internal.toolkit.WriterFactory;
import com.sun.tools.doclets.internal.toolkit.util.MessageRetriever;

public class HDFS_CRUD {
	FileSystem fs = null;
	@Before
	public void init() throws Exception{
		//要访问hdfs的url
		Configuration conf = new Configuration(); 
		//执行hdfs
		conf.set("fs.defaultFS","hdfs://gaoyu:9000");
		//进行客户端设置
		System.setProperty("HADOOP_USER_NAME", "root");
		//静态方法获取客户端对象
		fs = FileSystem.get(conf);
	}
	
	@Test
	public void testAddFileToHdfs() throws IOException{
		//将上传的文件保存在本地路劲
		Path src = new Path("D:/test.txt");
		//要上川岛hdfs目标路劲
		Path dst = new Path("/testFile");
		//上传文件方法
		fs.copyFromLocalFile(src, dst);
		//关闭资源
		fs.close();
	}
	
	@Test
	public void testDownloadFileTolocal() throws IllegalMonitorStateException,IOException{
		//下载文件
		fs.copyToLocalFile(new Path("/testFile"), new Path("D://"));
		fs.close();
	}
	
	//创建删除重命名文件
	@Test
	public void testMkdirAndDeleteAndRename() throws Exception{
		//创建目录
		fs.mkdirs(new Path("/a/b/c"));
		fs.mkdirs(new Path("/a2/b2/c2"));
		//重命名文件或文件夹
		fs.rename(new Path("/a"), new Path("/a3"));
		//删除文件夹
		fs.delete(new Path("/a2"),true);
		
	}
	
	@Test
	public void testListFiles() throws FileNotFoundException,IllegalArgumentException,IOException{
		//获取迭代器对象
		RemoteIterator<LocatedFileStatus>listFiles = fs.listFiles(new Path("/"), true);
		while(listFiles.hasNext()) {
			LocatedFileStatus fileStatus = listFiles.next();
			//打印当前文件名
			System.out.println(fileStatus.getPath().getName());
			//打印当前文件块大小
			System.out.println(fileStatus.getBlockSize());
			//打印当前文件权限
			System.out.println(fileStatus.getPermission());
			//打印当前文件内容长度
			System.out.println(fileStatus.getLen());
			//获取文件块信息
			BlockLocation[] blockLocations = fileStatus.getBlockLocations();
			for(BlockLocation b1 : blockLocations) {
				System.out.println("block-length:"+b1.getLength()+"--"+"block-offset:"+b1.getOffset());
				String[] hosts = b1.getHosts();
				for(String host : hosts) {
					System.out.println(host);
				}
			}
			System.out.println("-------------分割线--------------------");
		}
	}
	

}

注意:
gaoyu:9000s是自己设置的在core-site.xml中,自行查看
MapReduce经典案例--数据去重
其中output在本地目录下不用创建,会自动生成,input目录需要自行创建在相应路劲下,我的是在D:\Dedup\input目录下,和代码中位置需要一直一致MapReduce经典案例--数据去重MapReduce经典案例--数据去重

  • 创建cn.itcast.mr.dedup包,在该路径下编写自定义Mapper类DedupMapper,自定义Reducer类DedupReducer以及运行主类DedupDriver

DedupMapper 类

package cn.itcast.mr.dedup;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DedupMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

	private static Text field = new Text();
	// <0,2018-3-3 c><11,2018-3-4 d>
	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		field = value;
		context.write(field, NullWritable.get());
	}
	// <2018-3-3 c,null> <2018-3-4 d,null>
}

DedupReducer类

package cn.itcast.mr.dedup;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class DedupReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
	// <2018-3-3 c,null> <2018-3-4 d,null><2018-3-4 d,null>
	@Override
	protected void reduce(Text key, Iterable<NullWritable> values, Context context)
			throws IOException, InterruptedException {
		context.write(key, NullWritable.get());
	}
}

DedupRunner类

package cn.itcast.mr.dedup;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DedupRunner {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(DedupRunner.class);
		job.setMapperClass(DedupMapper.class);
		job.setReducerClass(DedupReducer.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		FileInputFormat.setInputPaths(job, new Path("D:\\Dedup\\input"));
		// 指定处理完成之后的结果所保存的位置
		FileOutputFormat.setOutputPath(job, new Path("D:\\Dedup\\output"));

		job.waitForCompletion(true);

	}
}

  • 代码编写完毕后,打开虚拟机开启所有服务进程start-all.sh
  • 然后运行主类DedupRunner,在output目录下查看结果MapReduce经典案例--数据去重
    查看文件part-r-00000发现已经成功MapReduce经典案例--数据去重