欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

大数据去除重复--实战(二)

程序员文章站 2022-04-22 18:08:43
...

           关于上一篇数据去重复的问题,在结尾的时候提到,另一种思路:在url-->hashCode 根据范围写入文件的时候,不用迭代二分法,采用平均算法,也就是说根据url的大概行数,设置一个单位区间,循环遍历行的时候,根据hashCode 值,放入不同的空间,然后再放入内存去除重复,写入汇总文件。

          去个例子,我文件数据2G,1.5亿行,自己设定一个区间值1000W。 也就是说0~1000W、1000W~2000W 。。。为一个区间,每行都会通过计算放入不同的区间,这样在数据均匀的情况下,分布式很均匀的,如果某一个区间值过多,超过内存限制,继续切割,这样会大大减少第一种算法带来的文件数据的重复读取的效率。

          看代码,这里生产文件的代码参考前一篇文章。

          

        

package com.files;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Reader;
import java.io.Writer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

public class PartFile2 {
	// 内存监控
	final static Runtime currRuntime = Runtime.getRuntime ();
	// 最小的空闲空间,额,可以用来 智能控制,- -考虑到GC ,暂时没用
	final static long MEMERY_LIMIT = 1024 * 1024 * 3;
	// 内存限制,我内存最多容纳的文件大小
	static final long FILE_LIMIT_SIZE = 1024*1024*1;
	// 文件写入缓冲区 ,我默认
	static final int CACHE_SIZE = 1024;
	// 默认文件后缀
	static final String FILE_SUFFIX = ".txt";
	// 临时分割的文件目录,可以删除~。~
	static final String FILE_PREFIX = "test/";
	// 汇总的文件名
	static final String REQUST_FILE_NAME = "resultFile.txt";

	
	// 存放小文件的,驱除重复数据
	static Map<String,String> fileLinesMap = new HashMap<String,String>(10000);
	// 按hashCode 分割的单位
	static int HASH_UNIT = 1000*1000*10;
	// 收缩率,HASH_UNIT 在一定范围内,数据过大,用这个调节之后继续分割
	static final int shrinkage_factor = 1000;
	// 存放分割d
	static Map<File,BufferedWriter> map = new HashMap<File, BufferedWriter>();
	
	// 存放大文件 引用,以及分割位置
	static List<ChildFile> bigChildFiles = new ArrayList<ChildFile>();
	
	static final String origFileName = "xxx.txt";

	public static void main(String[] args) {
		long begin = System.currentTimeMillis();
		new PartFile2().partFile(new File(origFileName));
		long result = System.currentTimeMillis()-begin;
		System.out.println("除去重复时间为:"+result +" 毫秒");
		// 除去重复时间为:931594 毫秒
	}
     
	
	// 按hashCode 范围分割
	public  void partFile(File origFile) {
		String line = null;
		BufferedReader reader = null;
		int hashCode;
		try {
			reader = new BufferedReader(new FileReader(origFile));
			while ((line = reader.readLine()) != null) {
				hashCode = line.hashCode(); 
				File file = getFileByHashCode(hashCode);
				writeToFile(getFileWriterByHashCode(hashCode,file), line);
			}
			// 将集合对象可以内存排序的全部写入汇总文件,大于内存的继续分割
			Iterator<Entry<File, BufferedWriter>> it = map.entrySet().iterator();
			while(it.hasNext()){
				Entry<File, BufferedWriter> entry = it.next();
				File file = entry.getKey();
				BufferedWriter writer = entry.getValue();
				if(isSurpassFileSize(file)){
					// 这里本想用再次细化空间,进行分割,发现当大量重复的元素的时候会溢出
					// 极端的情况下,全是重复元素,即使以前的方法也不能执行,需要另外控制
					// HASH_UNIT = HASH_UNIT/shrinkage_factor;
					// partFile(file);
					
					// 如果超出,继续分割
					String[] number = file.getName().split("_");
					long maxNum = Long.parseLong(number[0])*HASH_UNIT;
					long minNum = Long.parseLong(number[1].substring(0, number[1].indexOf(FILE_SUFFIX)))*HASH_UNIT;
					partFile(file,maxNum,minNum);
				}else{
					writer.flush();
					orderAndWriteToFiles(file);
					// 关闭刷新流
					closeWriter(writer);
					file.delete();
				}
			}
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}finally{
			closeReader(reader);
			shotDown();
		}
	}
	
	// 清除临时文件
	public void shotDown(){
		Iterator<File> it = map.keySet().iterator();
		while(it.hasNext()){
			File file = it.next();
			file.delete();
		}
	}
	
	// 按hashCode 范围分割
	public  void partFile(File origFile,long maxNum,long minNum) {
		String line = null;
		long hashCode = 0;
		long max_left_hashCode = 0;
		long min_left_hashCode = 0;
		long max_right_hashCode = 0;
		long min_right_hashCode = 0;
		BufferedWriter rightWriter = null;
		BufferedWriter leftWriter = null;
		BufferedReader reader = null;
		try {
			reader = new BufferedReader(new FileReader(origFile));
			long midNum = (maxNum+minNum)/2;
			// 以文件hashCode 范围作为子文件名
			File leftFile = new File(FILE_PREFIX+minNum+"_"+midNum+FILE_SUFFIX);
			File rightFile = new File(FILE_PREFIX+midNum +"_"+maxNum+FILE_SUFFIX);
			
			leftWriter = new BufferedWriter(new FileWriter(leftFile),CACHE_SIZE);
			rightWriter = new BufferedWriter(new FileWriter(rightFile),CACHE_SIZE);
			
			ChildFile leftChild = new ChildFile(leftFile);
			ChildFile rightChild = new ChildFile(rightFile);
			
			// hashCode 的范围作为分割线
			while ((line = reader.readLine()) != null) {
				hashCode = line.hashCode();
					if (hashCode > midNum) {
						if(max_right_hashCode < hashCode || max_right_hashCode == 0){
							max_right_hashCode = hashCode;
						}else if(min_right_hashCode > hashCode || min_right_hashCode == 0){
							min_right_hashCode = hashCode;
						}
						// 按行写入缓存
						writeToFile(rightWriter, line);
				}else {
						if(max_left_hashCode < hashCode || max_left_hashCode == 0){
							max_left_hashCode = hashCode;
						}else if(min_left_hashCode > hashCode || min_left_hashCode == 0){
							min_left_hashCode = hashCode;
						}
						writeToFile(leftWriter, line);
				}
			}
			// 保存子文件信息
			leftChild.setHashCode(min_left_hashCode, max_left_hashCode);
			rightChild.setHashCode(min_right_hashCode, max_right_hashCode);
			
			closeWriter(rightWriter);
			closeWriter(leftWriter);
			closeReader(reader);

			
			// 删除原始文件,保留最原始的文件
			
			if(!origFile.getName().equals(origFileName)){
				origFile.delete();
			}
			
			// 分析子文件信息,是否写入或者迭代
			analyseChildFile(rightChild, leftChild);

		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	
	// 分析子文件信息
	public void analyseChildFile(ChildFile rightChild,ChildFile leftChild){
		// 将分割后 还是大于内存的文件保存  继续分割
		File rightFile = rightChild.getChildFile();
		if(isSurpassFileSize(rightFile)){
			bigChildFiles.add(rightChild);
		}else if(rightFile.length()>0){
			orderAndWriteToFiles(rightFile);
		}
		
		File leftFile = leftChild.getChildFile();
		if(isSurpassFileSize(leftFile)){
			bigChildFiles.add(leftChild);
		}else if(leftFile.length()>0){
			orderAndWriteToFiles(leftFile);
		}
		
		// 未超出直接内存排序,写入文件,超出继续分割,从末尾开始,不易栈深度溢出
		if(bigChildFiles.size() > 0 ){
			ChildFile e  = bigChildFiles.get(bigChildFiles.size()-1);
			bigChildFiles.remove(e);
			// 迭代分割
			partFile(e.getChildFile(), e.getMaxHashCode(), e.getMinHashCode());
		}
	}
	
	
	// 根据hashCode 值,计算存放的位置
	public File getFileByHashCode(int hashCode){
		int i = hashCode/HASH_UNIT;
		return new File(FILE_PREFIX+i+"_"+i+1+FILE_SUFFIX);
	}
	
	// 获得缓存流,这里缓存大小不能太大
	public BufferedWriter getFileWriterByHashCode(int hashCode,File file) throws IOException{
		BufferedWriter writer = null;
		writer = map.get(file);
		if(writer == null){
			writer = new BufferedWriter(new FileWriter(file,true),CACHE_SIZE);
			map.put(file, writer);
		}
		return writer;
	}
	

	// 将小文件读到内存排序除重复
	public  void orderAndWriteToFiles(File file){
		BufferedReader reader = null;
		String line = null;
		BufferedWriter totalWriter = null;
		StringBuilder sb = new StringBuilder(1000000);
		try {
			totalWriter = new BufferedWriter(new FileWriter(REQUST_FILE_NAME,true),CACHE_SIZE);
			reader = new BufferedReader(new FileReader(file));
			while((line = reader.readLine()) != null){
				if(!fileLinesMap.containsKey(line)){
					fileLinesMap.put(line, null);
					//sb.append(line+"\r\n");
					totalWriter.write(line+"\r\n");
				}
			}
			//totalWriter.write(sb.toString());
			fileLinesMap.clear();
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}finally{
			closeReader(reader);
			closeWriter(totalWriter);
			file.delete();
		}
	}
	

	
	// 判断该文件是否超过 内存限制
	public  boolean isSurpassFileSize(File file){
		if(file.length() == 0){
			System.out.println(file.delete());;
			return false;
		}
		return FILE_LIMIT_SIZE <  file.length();
	}
	

	// 将数据写入文件
	public  void writeToFile(BufferedWriter writer, String writeInfo) {
		try {
			writer.write(writeInfo+"\r\n");
		} catch (IOException e) {
			e.printStackTrace();
		}finally{
		}
	}

	// 关闭流
	public  void closeReader(Reader reader) {
		if (reader != null) {
			try {
				reader.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	// 关闭流
	public  void closeWriter(Writer writer) {
		if (writer != null) {
			try {
				writer.flush();
				writer.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	
	// 内部类,记录子文件信息
	class ChildFile{
		// 文件 和 内容 hash 分布
		File childFile;
		long maxHashCode;
		long minHashCode;
		
		public ChildFile(File childFile){
			this.childFile = childFile;
		}
		
		public ChildFile(File childFile, long maxHashCode, long minHashCode) {
			super();
			this.childFile = childFile;
			this.maxHashCode = maxHashCode;
			this.minHashCode = minHashCode;
		}
		
		public File getChildFile() {
			return childFile;
		}
		public void setChildFile(File childFile) {
			this.childFile = childFile;
		}
		public long getMaxHashCode() {
			return maxHashCode;
		}
		public void setMaxHashCode(long maxHashCode) {
			this.maxHashCode = maxHashCode;
		}
		public long getMinHashCode() {
			return minHashCode;
		}
		public void setMinHashCode(long minHashCode) {
			this.minHashCode = minHashCode;
		}
		
		public void setHashCode(long minHashCode,long maxHashCode){
			this.setMaxHashCode(maxHashCode);
			this.setMinHashCode(minHashCode);
		}
		
	}

}

 

在数据分布均匀的均匀,不会有大量重复数据的情况下,比如用上一次的2G 文件测试,能节约5分钟左右的时间,如果全是重复数据会溢出失败,不太稳定,需要对数据做一定量的分析。

 

 

 

小结:

           1.这是对实战一方法的一些改进,在一定程度上能提高速度。小数据(100m)和第一种速度相当,在数据hashCode分布均匀,速度较快。

           2.在处理这类大数据的问题上,主要是分离法,hash分离 成内存可以容纳的文件,然后分别处理,不同的要求有不同的处理办法,但是大致方法类似。

           3.有问题,请留言,共同探讨!