欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

java使用多线程读取超大文件

程序员文章站 2022-06-03 11:42:37
接上次写的“java读取超大文件”。在读取超过10g的文件时会发现一次读一行的速度实在是不能接受,想到使用多线程+filechannel来做一个使用多线程版本。 基本思路...

接上次写的“java读取超大文件”。在读取超过10g的文件时会发现一次读一行的速度实在是不能接受,想到使用多线程+filechannel来做一个使用多线程版本。

基本思路如下:

1.计算出文件总大小

2.分段处理,计算出每个线程读取文件的开始与结束位置

  (文件大小/线程数)*n,n是指第几个线程,这样能得到每个线程在读该文件的大概起始位置

使用"大概起始位置",作为读文件的开始偏移量(filechannel.position("大概起始位置")),来读取该文件,直到读到第一个换行符,记录下这个换行符的位置,作为该线程的准确起 始位置.同时它也是上一个线程的结束位置.最后一个线程的结束位置也直接设置为-1

3.启动线程,每个线程从开始位置读取到结束位置为止

代码如下:

读文件工具类

import java.io.*;
import java.nio.bytebuffer;
import java.nio.channels.filechannel;
import java.util.observable;
 
/**
 * created with intellij idea.
 * user: okey
 * date: 14-4-2
 * time: 下午3:12
 * 读取文件
 */
public class readfile extends observable {
 
 private int bufsize = 1024;
 // 换行符
 private byte key = "\n".getbytes()[0];
 // 当前行数
 private long linenum = 0;
 // 文件编码,默认为gb2312
 private string encode = "gb2312";
 // 具体业务逻辑监听器
 private readerfilelistener readerlistener;
 
 public void setencode(string encode) {
  this.encode = encode;
 }
 
 public void setreaderlistener(readerfilelistener readerlistener) {
  this.readerlistener = readerlistener;
 }
 
 /**
  * 获取准确开始位置
  * @param file
  * @param position
  * @return
  * @throws exception
  */
 public long getstartnum(file file, long position) throws exception {
  long startnum = position;
  filechannel fcin = new randomaccessfile(file, "r").getchannel();
  fcin.position(position);
  try {
   int cache = 1024;
   bytebuffer rbuffer = bytebuffer.allocate(cache);
   // 每次读取的内容
   byte[] bs = new byte[cache];
   // 缓存
   byte[] tempbs = new byte[0];
   string line = "";
   while (fcin.read(rbuffer) != -1) {
    int rsize = rbuffer.position();
    rbuffer.rewind();
    rbuffer.get(bs);
    rbuffer.clear();
    byte[] newstrbyte = bs;
    // 如果发现有上次未读完的缓存,则将它加到当前读取的内容前面
    if (null != tempbs) {
     int tl = tempbs.length;
     newstrbyte = new byte[rsize + tl];
     system.arraycopy(tempbs, 0, newstrbyte, 0, tl);
     system.arraycopy(bs, 0, newstrbyte, tl, rsize);
    }
    // 获取开始位置之后的第一个换行符
    int endindex = indexof(newstrbyte, 0);
    if (endindex != -1) {
     return startnum + endindex;
    }
    tempbs = substring(newstrbyte, 0, newstrbyte.length);
    startnum += 1024;
   }
  } catch (exception e) {
   e.printstacktrace();
  } finally {
   fcin.close();
  }
  return position;
 }
 
 /**
  * 从设置的开始位置读取文件,一直到结束为止。如果 end设置为负数,刚读取到文件末尾
  * @param fullpath
  * @param start
  * @param end
  * @throws exception
  */
 public void readfilebyline(string fullpath, long start, long end) throws exception {
  file fin = new file(fullpath);
  if (fin.exists()) {
   filechannel fcin = new randomaccessfile(fin, "r").getchannel();
   fcin.position(start);
   try {
    bytebuffer rbuffer = bytebuffer.allocate(bufsize);
    // 每次读取的内容
    byte[] bs = new byte[bufsize];
    // 缓存
    byte[] tempbs = new byte[0];
    string line = "";
    // 当前读取文件位置
    long nowcur = start;
    while (fcin.read(rbuffer) != -1) {
     nowcur += bufsize;
 
     int rsize = rbuffer.position();
     rbuffer.rewind();
     rbuffer.get(bs);
     rbuffer.clear();
     byte[] newstrbyte = bs;
     // 如果发现有上次未读完的缓存,则将它加到当前读取的内容前面
     if (null != tempbs) {
      int tl = tempbs.length;
      newstrbyte = new byte[rsize + tl];
      system.arraycopy(tempbs, 0, newstrbyte, 0, tl);
      system.arraycopy(bs, 0, newstrbyte, tl, rsize);
     }
     // 是否已经读到最后一位
     boolean isend = false;
     // 如果当前读取的位数已经比设置的结束位置大的时候,将读取的内容截取到设置的结束位置
     if (end > 0 && nowcur > end) {
      // 缓存长度 - 当前已经读取位数 - 最后位数
      int l = newstrbyte.length - (int) (nowcur - end);
      newstrbyte = substring(newstrbyte, 0, l);
      isend = true;
     }
     int fromindex = 0;
     int endindex = 0;
     // 每次读一行内容,以 key(默认为\n) 作为结束符
     while ((endindex = indexof(newstrbyte, fromindex)) != -1) {
      byte[] bline = substring(newstrbyte, fromindex, endindex);
      line = new string(bline, 0, bline.length, encode);
      linenum++;
      // 输出一行内容,处理方式由调用方提供
      readerlistener.outline(line.trim(), linenum, false);
      fromindex = endindex + 1;
     }
     // 将未读取完成的内容放到缓存中
     tempbs = substring(newstrbyte, fromindex, newstrbyte.length);
     if (isend) {
      break;
     }
    }
    // 将剩下的最后内容作为一行,输出,并指明这是最后一行
    string linestr = new string(tempbs, 0, tempbs.length, encode);
    readerlistener.outline(linestr.trim(), linenum, true);
   } catch (exception e) {
    e.printstacktrace();
   } finally {
    fcin.close();
   }
 
  } else {
   throw new filenotfoundexception("没有找到文件:" + fullpath);
  }
  // 通知观察者,当前工作已经完成
  setchanged();
  notifyobservers(start+"-"+end);
 }
 
 /**
  * 查找一个byte[]从指定位置之后的一个换行符位置
  *
  * @param src
  * @param fromindex
  * @return
  * @throws exception
  */
 private int indexof(byte[] src, int fromindex) throws exception {
 
  for (int i = fromindex; i < src.length; i++) {
   if (src[i] == key) {
    return i;
   }
  }
  return -1;
 }
 
 /**
  * 从指定开始位置读取一个byte[]直到指定结束位置为止生成一个全新的byte[]
  *
  * @param src
  * @param fromindex
  * @param endindex
  * @return
  * @throws exception
  */
 private byte[] substring(byte[] src, int fromindex, int endindex) throws exception {
  int size = endindex - fromindex;
  byte[] ret = new byte[size];
  system.arraycopy(src, fromindex, ret, 0, size);
  return ret;
 }
 
}

读文件线程

/**
 * created with intellij idea.
 * user: okey
 * date: 14-4-2
 * time: 下午4:50
 * to change this template use file | settings | file templates.
 */
public class readfilethread extends thread {
 
 private readerfilelistener processpoidatalisteners;
 private string filepath;
 private long start;
 private long end;
 
 public readfilethread(readerfilelistener processpoidatalisteners,long start,long end,string file) {
  this.setname(this.getname()+"-readfilethread");
  this.start = start;
  this.end = end;
  this.filepath = file;
  this.processpoidatalisteners = processpoidatalisteners;
 }
 
 @override
 public void run() {
  readfile readfile = new readfile();
  readfile.setreaderlistener(processpoidatalisteners);
  readfile.setencode(processpoidatalisteners.getencode());
//  readfile.addobserver();
  try {
   readfile.readfilebyline(filepath, start, end + 1);
  } catch (exception e) {
   e.printstacktrace();
  }
 }
}

具体业务逻辑监听

/**
 * created with okey
 * user: okey
 * date: 13-3-14
 * time: 下午3:19
 * nio逐行读数据回调方法
 */
public abstract class readerfilelistener {
 
 // 一次读取行数,默认为500
 private int readcolnum = 500;
 
 private string encode;
 
 private list<string> list = new arraylist<string>();
 
 /**
  * 设置一次读取行数
  * @param readcolnum
  */
 protected void setreadcolnum(int readcolnum) {
  this.readcolnum = readcolnum;
 }
 
 public string getencode() {
  return encode;
 }
 
 public void setencode(string encode) {
  this.encode = encode;
 }
 
 /**
  * 每读取到一行数据,添加到缓存中
  * @param linestr 读取到的数据
  * @param linenum 行号
  * @param over 是否读取完成
  * @throws exception
  */
 public void outline(string linestr, long linenum, boolean over) throws exception {
  if(null != linestr)
   list.add(linestr);
  if (!over && (linenum % readcolnum == 0)) {
   output(list);
   list.clear();
  } else if (over) {
   output(list);
   list.clear();
  }
 }
 
 /**
  * 批量输出
  *
  * @param stringlist
  * @throws exception
  */
 public abstract void output(list<string> stringlist) throws exception;
 
}

线程调度

import java.io.file;
import java.io.fileinputstream;
import java.io.ioexception;
 
/**
 * created with intellij idea.
 * user: okey
 * date: 14-4-1
 * time: 下午6:03
 * to change this template use file | settings | file templates.
 */
public class builddata {
 public static void main(string[] args) throws exception {
  file file = new file("e:\\1396341974289.csv");
  fileinputstream fis = null;
  try {
   readfile readfile = new readfile();
   fis = new fileinputstream(file);
   int available = fis.available();
   int maxthreadnum = 50;
   // 线程粗略开始位置
   int i = available / maxthreadnum;
   for (int j = 0; j < maxthreadnum; j++) {
    // 计算精确开始位置
    long startnum = j == 0 ? 0 : readfile.getstartnum(file, i * j);
    long endnum = j + 1 < maxthreadnum ? readfile.getstartnum(file, i * (j + 1)) : -2;
    // 具体监听实现
    processdatabypostgislisteners listeners = new processdatabypostgislisteners("gbk");
    new readfilethread(listeners, startnum, endnum, file.getpath()).start();
   }
  } catch (ioexception e) {
   e.printstacktrace();
  } catch (exception e) {
   e.printstacktrace();
  }
 }
}

现在就可以尽情的调整 maxthreadnum来享受风一般的速度吧!

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。