欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java多线程实现快速切分文件的程序

程序员文章站 2024-03-12 17:38:14
前段时间需要进行大批量数据导入,dba给提供的是cvs文件,但是每个cvs文件都好几个gb大小,直接进行load,数据库很慢还会产生内存不足的问题,为了实现这个功能,写了个...

前段时间需要进行大批量数据导入,dba给提供的是cvs文件,但是每个cvs文件都好几个gb大小,直接进行load,数据库很慢还会产生内存不足的问题,为了实现这个功能,写了个快速切分文件的程序。

import org.apache.log4j.logmanager;
import org.apache.log4j.logger;
 
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
 
public class filesplitutil {
 
  private final static logger log = logmanager.getlogger(filesplitutil.class);
  private static final long originfilesize = 1024 * 1024 * 100;// 100m
  private static final int blockfilesize = 1024 * 1024 * 64;// 防止中文乱码,必须取2的n次方
  /**
   * cvs文件分隔符
   */
  private static final char cvsseparator = '^';
  public static void main(string args[]){
    long start = system.currenttimemillis();
    try {
      string filename = "d:\\csvtest\\aa.csv";
      file sourcefile = new file(filename);
      if (sourcefile.length() >= originfilesize) {
        string cvsfilename = filename.replaceall("\\\\", "/");
        filesplitutil filesplitutil = new filesplitutil();
        list<string> parts=filesplitutil.splitbysize(cvsfilename, blockfilesize);
        for(string part:parts){
          system.out.println("partname is:"+part);
        }
      }
      system.out.println("总文件长度"+sourcefile.length()+",拆分文件耗时:" + (system.currenttimemillis() - start) + "ms.");
    }catch (exception e){
      log.info(e.getstacktrace());
    }
 
  }
 
 
 
  /**
   * 拆分文件
   *
   * @param filename 待拆分的完整文件名
   * @param bytesize 按多少字节大小拆分
   * @return 拆分后的文件名列表
   */
  public list<string> splitbysize(string filename, int bytesize)
      throws ioexception, interruptedexception {
    list<string> parts = new arraylist<string>();
    file file = new file(filename);
    int count = (int) math.ceil(file.length() / (double) bytesize);
    int countlen = (count + "").length();
    randomaccessfile raf = new randomaccessfile(filename, "r");
    long totallen = raf.length();
    countdownlatch latch = new countdownlatch(count);
 
    for (int i = 0; i < count; i++) {
      string partfilename = file.getpath() + "."
          + leftpad((i + 1) + "", countlen, '0') + ".cvs";
      int readsize=bytesize;
      long startpos=(long)i * bytesize;
      long nextpos=(long)(i+1) * bytesize;
      if(nextpos>totallen){
        readsize= (int) (totallen-startpos);
      }
      new splitrunnable(readsize, startpos, partfilename, file, latch).run();
      parts.add(partfilename);
    }
    latch.await();//等待所有文件写完
    //由于切割时可能会导致行被切断,加工所有的的分割文件,合并行
    mergerow(parts);
    return parts;
  }
 
  /**
   * 分割处理runnable
   *
   * @author supeidong
   */
  private class splitrunnable implements runnable {
    int bytesize;
    string partfilename;
    file originfile;
    long startpos;
    countdownlatch latch;
    public splitrunnable(int bytesize, long startpos, string partfilename,
               file originfile, countdownlatch latch) {
      this.startpos = startpos;
      this.bytesize = bytesize;
      this.partfilename = partfilename;
      this.originfile = originfile;
      this.latch = latch;
    }
 
    public void run() {
      randomaccessfile rfile;
      outputstream os;
      try {
        rfile = new randomaccessfile(originfile, "r");
        byte[] b = new byte[bytesize];
        rfile.seek(startpos);// 移动指针到每“段”开头
        int s = rfile.read(b);
        os = new fileoutputstream(partfilename);
        os.write(b, 0, s);
        os.flush();
        os.close();
        latch.countdown();
      } catch (ioexception e) {
        log.error(e.getmessage());
        latch.countdown();
      }
    }
  }
 
  /**
   * 合并被切断的行
   *
   * @param parts
   */
  private void mergerow(list<string> parts) {
    list<partfile> partfiles = new arraylist<partfile>();
    try {
      //组装被切分表对象
      for (int i=0;i<parts.size();i++) {
        string partfilename=parts.get(i);
        file splitfiletemp = new file(partfilename);
        if (splitfiletemp.exists()) {
          partfile partfile = new partfile();
          bufferedreader reader=new bufferedreader(new inputstreamreader(new fileinputstream(splitfiletemp),"gbk"));
          string firstrow = reader.readline();
          string secondrow = reader.readline();
          string endrow = readlastline(partfilename);
          partfile.setpartfilename(partfilename);
          partfile.setfirstrow(firstrow);
          partfile.setendrow(endrow);
          if(i>=1){
            string prepartfile=parts.get(i - 1);
            string preendrow = readlastline(prepartfile);
            partfile.setfirstisfull(getcharcount(firstrow+preendrow)>getcharcount(secondrow));
          }
 
          partfiles.add(partfile);
          reader.close();
        }
      }
      //进行需要合并的行的写入
      for (int i = 0; i < partfiles.size() - 1; i++) {
        partfile partfile = partfiles.get(i);
        partfile partfilenext = partfiles.get(i + 1);
        stringbuilder sb = new stringbuilder();
        if (partfilenext.getfirstisfull()) {
          sb.append("\r\n");
          sb.append(partfilenext.getfirstrow());
        } else {
          sb.append(partfilenext.getfirstrow());
        }
        writelastline(partfile.getpartfilename(),sb.tostring());
      }
    } catch (exception e) {
      log.error(e.getmessage());
    }
  }
 
  /**
   * 得到某个字符出现的次数
   * @param s
   * @return
   */
  private int getcharcount(string s) {
    int count = 0;
    for (int i = 0; i < s.length(); i++) {
      if (s.charat(i) == cvsseparator) {
        count++;
      }
    }
    return count;
  }
 
  /**
   * 采用bufferedinputstream方式读取文件行数
   *
   * @param filename
   * @return
   */
  public int getfilerow(string filename) throws ioexception {
    inputstream is = new bufferedinputstream(new fileinputstream(filename));
    byte[] c = new byte[1024];
    int count = 0;
    int readchars = 0;
    while ((readchars = is.read(c)) != -1) {
      for (int i = 0; i < readchars; ++i) {
        if (c[i] == '\n')
          ++count;
      }
    }
    is.close();
    return count;
  }
 
  /**
   * 读取最后一行数据
   * @param filename
   * @return
   * @throws ioexception
   */
  private string readlastline(string filename) throws ioexception {
    // 使用randomaccessfile , 从后找最后一行数据
    randomaccessfile raf = new randomaccessfile(filename, "r");
    long len = raf.length();
    string lastline = "";
    if(len!=0l) {
      long pos = len - 1;
      while (pos > 0) {
        pos--;
        raf.seek(pos);
        if (raf.readbyte() == '\n') {
          lastline = raf.readline();
          lastline=new string(lastline.getbytes("8859_1"), "gbk");
          break;
        }
      }
    }
    raf.close();
    return lastline;
  }
  /**
   * 修改最后一行数据
   * @param filename
   * @param laststring
   * @return
   * @throws ioexception
   */
  private void writelastline(string filename,string laststring){
    try {
      // 打开一个随机访问文件流,按读写方式
      randomaccessfile randomfile = new randomaccessfile(filename, "rw");
      // 文件长度,字节数
      long filelength = randomfile.length();
      //将写文件指针移到文件尾。
      randomfile.seek(filelength);
      //此处必须加gbk,否则会出现写入乱码
      randomfile.write(laststring.getbytes("gbk"));
      randomfile.close();
    } catch (ioexception e) {
      log.error(e.getmessage());
    }
  }
  /**
   * 左填充
   *
   * @param str
   * @param length
   * @param ch
   * @return
   */
  public static string leftpad(string str, int length, char ch) {
    if (str.length() >= length) {
      return str;
    }
    char[] chs = new char[length];
    arrays.fill(chs, ch);
    char[] src = str.tochararray();
    system.arraycopy(src, 0, chs, length - src.length, src.length);
    return new string(chs);
  }
 
  /**
   * 合并文件行内部类
   */
  class partfile {
    private string partfilename;
    private string firstrow;
    private string endrow;
    private boolean firstisfull;
 
    public string getpartfilename() {
      return partfilename;
    }
 
    public void setpartfilename(string partfilename) {
      this.partfilename = partfilename;
    }
 
    public string getfirstrow() {
      return firstrow;
    }
 
    public void setfirstrow(string firstrow) {
      this.firstrow = firstrow;
    }
 
    public string getendrow() {
      return endrow;
    }
 
    public void setendrow(string endrow) {
      this.endrow = endrow;
    }
 
    public boolean getfirstisfull() {
      return firstisfull;
    }
 
    public void setfirstisfull(boolean firstisfull) {
      this.firstisfull = firstisfull;
    }
  }
 
}

以上就是本文的全部内容,希望对大家学习java程序设计有所帮助。