大数据去除重复--实战(二)
程序员文章站
2022-04-22 18:08:43
...
关于上一篇数据去重复的问题,在结尾的时候提到,另一种思路:在url-->hashCode 根据范围写入文件的时候,不用迭代二分法,采用平均算法,也就是说根据url的大概行数,设置一个单位区间,循环遍历行的时候,根据hashCode 值,放入不同的空间,然后再放入内存去除重复,写入汇总文件。
去个例子,我文件数据2G,1.5亿行,自己设定一个区间值1000W。 也就是说0~1000W、1000W~2000W 。。。为一个区间,每行都会通过计算放入不同的区间,这样在数据均匀的情况下,分布式很均匀的,如果某一个区间值过多,超过内存限制,继续切割,这样会大大减少第一种算法带来的文件数据的重复读取的效率。
看代码,这里生产文件的代码参考前一篇文章。
package com.files; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.Reader; import java.io.Writer; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; public class PartFile2 { // 内存监控 final static Runtime currRuntime = Runtime.getRuntime (); // 最小的空闲空间,额,可以用来 智能控制,- -考虑到GC ,暂时没用 final static long MEMERY_LIMIT = 1024 * 1024 * 3; // 内存限制,我内存最多容纳的文件大小 static final long FILE_LIMIT_SIZE = 1024*1024*1; // 文件写入缓冲区 ,我默认 static final int CACHE_SIZE = 1024; // 默认文件后缀 static final String FILE_SUFFIX = ".txt"; // 临时分割的文件目录,可以删除~。~ static final String FILE_PREFIX = "test/"; // 汇总的文件名 static final String REQUST_FILE_NAME = "resultFile.txt"; // 存放小文件的,驱除重复数据 static Map<String,String> fileLinesMap = new HashMap<String,String>(10000); // 按hashCode 分割的单位 static int HASH_UNIT = 1000*1000*10; // 收缩率,HASH_UNIT 在一定范围内,数据过大,用这个调节之后继续分割 static final int shrinkage_factor = 1000; // 存放分割d static Map<File,BufferedWriter> map = new HashMap<File, BufferedWriter>(); // 存放大文件 引用,以及分割位置 static List<ChildFile> bigChildFiles = new ArrayList<ChildFile>(); static final String origFileName = "xxx.txt"; public static void main(String[] args) { long begin = System.currentTimeMillis(); new PartFile2().partFile(new File(origFileName)); long result = System.currentTimeMillis()-begin; System.out.println("除去重复时间为:"+result +" 毫秒"); // 除去重复时间为:931594 毫秒 } // 按hashCode 范围分割 public void partFile(File origFile) { String line = null; BufferedReader reader = null; int hashCode; try { reader = new BufferedReader(new FileReader(origFile)); while ((line = reader.readLine()) != null) { hashCode = line.hashCode(); File file = getFileByHashCode(hashCode); writeToFile(getFileWriterByHashCode(hashCode,file), line); } // 将集合对象可以内存排序的全部写入汇总文件,大于内存的继续分割 Iterator<Entry<File, BufferedWriter>> it = map.entrySet().iterator(); while(it.hasNext()){ Entry<File, BufferedWriter> entry = it.next(); File file = entry.getKey(); BufferedWriter writer = entry.getValue(); if(isSurpassFileSize(file)){ // 这里本想用再次细化空间,进行分割,发现当大量重复的元素的时候会溢出 // 极端的情况下,全是重复元素,即使以前的方法也不能执行,需要另外控制 // HASH_UNIT = HASH_UNIT/shrinkage_factor; // partFile(file); // 如果超出,继续分割 String[] number = file.getName().split("_"); long maxNum = Long.parseLong(number[0])*HASH_UNIT; long minNum = Long.parseLong(number[1].substring(0, number[1].indexOf(FILE_SUFFIX)))*HASH_UNIT; partFile(file,maxNum,minNum); }else{ writer.flush(); orderAndWriteToFiles(file); // 关闭刷新流 closeWriter(writer); file.delete(); } } } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }finally{ closeReader(reader); shotDown(); } } // 清除临时文件 public void shotDown(){ Iterator<File> it = map.keySet().iterator(); while(it.hasNext()){ File file = it.next(); file.delete(); } } // 按hashCode 范围分割 public void partFile(File origFile,long maxNum,long minNum) { String line = null; long hashCode = 0; long max_left_hashCode = 0; long min_left_hashCode = 0; long max_right_hashCode = 0; long min_right_hashCode = 0; BufferedWriter rightWriter = null; BufferedWriter leftWriter = null; BufferedReader reader = null; try { reader = new BufferedReader(new FileReader(origFile)); long midNum = (maxNum+minNum)/2; // 以文件hashCode 范围作为子文件名 File leftFile = new File(FILE_PREFIX+minNum+"_"+midNum+FILE_SUFFIX); File rightFile = new File(FILE_PREFIX+midNum +"_"+maxNum+FILE_SUFFIX); leftWriter = new BufferedWriter(new FileWriter(leftFile),CACHE_SIZE); rightWriter = new BufferedWriter(new FileWriter(rightFile),CACHE_SIZE); ChildFile leftChild = new ChildFile(leftFile); ChildFile rightChild = new ChildFile(rightFile); // hashCode 的范围作为分割线 while ((line = reader.readLine()) != null) { hashCode = line.hashCode(); if (hashCode > midNum) { if(max_right_hashCode < hashCode || max_right_hashCode == 0){ max_right_hashCode = hashCode; }else if(min_right_hashCode > hashCode || min_right_hashCode == 0){ min_right_hashCode = hashCode; } // 按行写入缓存 writeToFile(rightWriter, line); }else { if(max_left_hashCode < hashCode || max_left_hashCode == 0){ max_left_hashCode = hashCode; }else if(min_left_hashCode > hashCode || min_left_hashCode == 0){ min_left_hashCode = hashCode; } writeToFile(leftWriter, line); } } // 保存子文件信息 leftChild.setHashCode(min_left_hashCode, max_left_hashCode); rightChild.setHashCode(min_right_hashCode, max_right_hashCode); closeWriter(rightWriter); closeWriter(leftWriter); closeReader(reader); // 删除原始文件,保留最原始的文件 if(!origFile.getName().equals(origFileName)){ origFile.delete(); } // 分析子文件信息,是否写入或者迭代 analyseChildFile(rightChild, leftChild); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } // 分析子文件信息 public void analyseChildFile(ChildFile rightChild,ChildFile leftChild){ // 将分割后 还是大于内存的文件保存 继续分割 File rightFile = rightChild.getChildFile(); if(isSurpassFileSize(rightFile)){ bigChildFiles.add(rightChild); }else if(rightFile.length()>0){ orderAndWriteToFiles(rightFile); } File leftFile = leftChild.getChildFile(); if(isSurpassFileSize(leftFile)){ bigChildFiles.add(leftChild); }else if(leftFile.length()>0){ orderAndWriteToFiles(leftFile); } // 未超出直接内存排序,写入文件,超出继续分割,从末尾开始,不易栈深度溢出 if(bigChildFiles.size() > 0 ){ ChildFile e = bigChildFiles.get(bigChildFiles.size()-1); bigChildFiles.remove(e); // 迭代分割 partFile(e.getChildFile(), e.getMaxHashCode(), e.getMinHashCode()); } } // 根据hashCode 值,计算存放的位置 public File getFileByHashCode(int hashCode){ int i = hashCode/HASH_UNIT; return new File(FILE_PREFIX+i+"_"+i+1+FILE_SUFFIX); } // 获得缓存流,这里缓存大小不能太大 public BufferedWriter getFileWriterByHashCode(int hashCode,File file) throws IOException{ BufferedWriter writer = null; writer = map.get(file); if(writer == null){ writer = new BufferedWriter(new FileWriter(file,true),CACHE_SIZE); map.put(file, writer); } return writer; } // 将小文件读到内存排序除重复 public void orderAndWriteToFiles(File file){ BufferedReader reader = null; String line = null; BufferedWriter totalWriter = null; StringBuilder sb = new StringBuilder(1000000); try { totalWriter = new BufferedWriter(new FileWriter(REQUST_FILE_NAME,true),CACHE_SIZE); reader = new BufferedReader(new FileReader(file)); while((line = reader.readLine()) != null){ if(!fileLinesMap.containsKey(line)){ fileLinesMap.put(line, null); //sb.append(line+"\r\n"); totalWriter.write(line+"\r\n"); } } //totalWriter.write(sb.toString()); fileLinesMap.clear(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }finally{ closeReader(reader); closeWriter(totalWriter); file.delete(); } } // 判断该文件是否超过 内存限制 public boolean isSurpassFileSize(File file){ if(file.length() == 0){ System.out.println(file.delete());; return false; } return FILE_LIMIT_SIZE < file.length(); } // 将数据写入文件 public void writeToFile(BufferedWriter writer, String writeInfo) { try { writer.write(writeInfo+"\r\n"); } catch (IOException e) { e.printStackTrace(); }finally{ } } // 关闭流 public void closeReader(Reader reader) { if (reader != null) { try { reader.close(); } catch (IOException e) { e.printStackTrace(); } } } // 关闭流 public void closeWriter(Writer writer) { if (writer != null) { try { writer.flush(); writer.close(); } catch (IOException e) { e.printStackTrace(); } } } // 内部类,记录子文件信息 class ChildFile{ // 文件 和 内容 hash 分布 File childFile; long maxHashCode; long minHashCode; public ChildFile(File childFile){ this.childFile = childFile; } public ChildFile(File childFile, long maxHashCode, long minHashCode) { super(); this.childFile = childFile; this.maxHashCode = maxHashCode; this.minHashCode = minHashCode; } public File getChildFile() { return childFile; } public void setChildFile(File childFile) { this.childFile = childFile; } public long getMaxHashCode() { return maxHashCode; } public void setMaxHashCode(long maxHashCode) { this.maxHashCode = maxHashCode; } public long getMinHashCode() { return minHashCode; } public void setMinHashCode(long minHashCode) { this.minHashCode = minHashCode; } public void setHashCode(long minHashCode,long maxHashCode){ this.setMaxHashCode(maxHashCode); this.setMinHashCode(minHashCode); } } }
在数据分布均匀的均匀,不会有大量重复数据的情况下,比如用上一次的2G 文件测试,能节约5分钟左右的时间,如果全是重复数据会溢出失败,不太稳定,需要对数据做一定量的分析。
小结:
1.这是对实战一方法的一些改进,在一定程度上能提高速度。小数据(100m)和第一种速度相当,在数据hashCode分布均匀,速度较快。
2.在处理这类大数据的问题上,主要是分离法,hash分离 成内存可以容纳的文件,然后分别处理,不同的要求有不同的处理办法,但是大致方法类似。
3.有问题,请留言,共同探讨!