java使用多线程读取超大文件
程序员文章站
2022-06-03 11:42:37
接上次写的“java读取超大文件”。在读取超过10g的文件时会发现一次读一行的速度实在是不能接受,想到使用多线程+filechannel来做一个使用多线程版本。
基本思路...
接上次写的“java读取超大文件”。在读取超过10g的文件时会发现一次读一行的速度实在是不能接受,想到使用多线程+filechannel来做一个使用多线程版本。
基本思路如下:
1.计算出文件总大小
2.分段处理,计算出每个线程读取文件的开始与结束位置
(文件大小/线程数)*n,n是指第几个线程,这样能得到每个线程在读该文件的大概起始位置
使用"大概起始位置",作为读文件的开始偏移量(filechannel.position("大概起始位置")),来读取该文件,直到读到第一个换行符,记录下这个换行符的位置,作为该线程的准确起 始位置.同时它也是上一个线程的结束位置.最后一个线程的结束位置也直接设置为-1
3.启动线程,每个线程从开始位置读取到结束位置为止
代码如下:
读文件工具类
import java.io.*; import java.nio.bytebuffer; import java.nio.channels.filechannel; import java.util.observable; /** * created with intellij idea. * user: okey * date: 14-4-2 * time: 下午3:12 * 读取文件 */ public class readfile extends observable { private int bufsize = 1024; // 换行符 private byte key = "\n".getbytes()[0]; // 当前行数 private long linenum = 0; // 文件编码,默认为gb2312 private string encode = "gb2312"; // 具体业务逻辑监听器 private readerfilelistener readerlistener; public void setencode(string encode) { this.encode = encode; } public void setreaderlistener(readerfilelistener readerlistener) { this.readerlistener = readerlistener; } /** * 获取准确开始位置 * @param file * @param position * @return * @throws exception */ public long getstartnum(file file, long position) throws exception { long startnum = position; filechannel fcin = new randomaccessfile(file, "r").getchannel(); fcin.position(position); try { int cache = 1024; bytebuffer rbuffer = bytebuffer.allocate(cache); // 每次读取的内容 byte[] bs = new byte[cache]; // 缓存 byte[] tempbs = new byte[0]; string line = ""; while (fcin.read(rbuffer) != -1) { int rsize = rbuffer.position(); rbuffer.rewind(); rbuffer.get(bs); rbuffer.clear(); byte[] newstrbyte = bs; // 如果发现有上次未读完的缓存,则将它加到当前读取的内容前面 if (null != tempbs) { int tl = tempbs.length; newstrbyte = new byte[rsize + tl]; system.arraycopy(tempbs, 0, newstrbyte, 0, tl); system.arraycopy(bs, 0, newstrbyte, tl, rsize); } // 获取开始位置之后的第一个换行符 int endindex = indexof(newstrbyte, 0); if (endindex != -1) { return startnum + endindex; } tempbs = substring(newstrbyte, 0, newstrbyte.length); startnum += 1024; } } catch (exception e) { e.printstacktrace(); } finally { fcin.close(); } return position; } /** * 从设置的开始位置读取文件,一直到结束为止。如果 end设置为负数,刚读取到文件末尾 * @param fullpath * @param start * @param end * @throws exception */ public void readfilebyline(string fullpath, long start, long end) throws exception { file fin = new file(fullpath); if (fin.exists()) { filechannel fcin = new randomaccessfile(fin, "r").getchannel(); fcin.position(start); try { bytebuffer rbuffer = bytebuffer.allocate(bufsize); // 每次读取的内容 byte[] bs = new byte[bufsize]; // 缓存 byte[] tempbs = new byte[0]; string line = ""; // 当前读取文件位置 long nowcur = start; while (fcin.read(rbuffer) != -1) { nowcur += bufsize; int rsize = rbuffer.position(); rbuffer.rewind(); rbuffer.get(bs); rbuffer.clear(); byte[] newstrbyte = bs; // 如果发现有上次未读完的缓存,则将它加到当前读取的内容前面 if (null != tempbs) { int tl = tempbs.length; newstrbyte = new byte[rsize + tl]; system.arraycopy(tempbs, 0, newstrbyte, 0, tl); system.arraycopy(bs, 0, newstrbyte, tl, rsize); } // 是否已经读到最后一位 boolean isend = false; // 如果当前读取的位数已经比设置的结束位置大的时候,将读取的内容截取到设置的结束位置 if (end > 0 && nowcur > end) { // 缓存长度 - 当前已经读取位数 - 最后位数 int l = newstrbyte.length - (int) (nowcur - end); newstrbyte = substring(newstrbyte, 0, l); isend = true; } int fromindex = 0; int endindex = 0; // 每次读一行内容,以 key(默认为\n) 作为结束符 while ((endindex = indexof(newstrbyte, fromindex)) != -1) { byte[] bline = substring(newstrbyte, fromindex, endindex); line = new string(bline, 0, bline.length, encode); linenum++; // 输出一行内容,处理方式由调用方提供 readerlistener.outline(line.trim(), linenum, false); fromindex = endindex + 1; } // 将未读取完成的内容放到缓存中 tempbs = substring(newstrbyte, fromindex, newstrbyte.length); if (isend) { break; } } // 将剩下的最后内容作为一行,输出,并指明这是最后一行 string linestr = new string(tempbs, 0, tempbs.length, encode); readerlistener.outline(linestr.trim(), linenum, true); } catch (exception e) { e.printstacktrace(); } finally { fcin.close(); } } else { throw new filenotfoundexception("没有找到文件:" + fullpath); } // 通知观察者,当前工作已经完成 setchanged(); notifyobservers(start+"-"+end); } /** * 查找一个byte[]从指定位置之后的一个换行符位置 * * @param src * @param fromindex * @return * @throws exception */ private int indexof(byte[] src, int fromindex) throws exception { for (int i = fromindex; i < src.length; i++) { if (src[i] == key) { return i; } } return -1; } /** * 从指定开始位置读取一个byte[]直到指定结束位置为止生成一个全新的byte[] * * @param src * @param fromindex * @param endindex * @return * @throws exception */ private byte[] substring(byte[] src, int fromindex, int endindex) throws exception { int size = endindex - fromindex; byte[] ret = new byte[size]; system.arraycopy(src, fromindex, ret, 0, size); return ret; } }
读文件线程
/** * created with intellij idea. * user: okey * date: 14-4-2 * time: 下午4:50 * to change this template use file | settings | file templates. */ public class readfilethread extends thread { private readerfilelistener processpoidatalisteners; private string filepath; private long start; private long end; public readfilethread(readerfilelistener processpoidatalisteners,long start,long end,string file) { this.setname(this.getname()+"-readfilethread"); this.start = start; this.end = end; this.filepath = file; this.processpoidatalisteners = processpoidatalisteners; } @override public void run() { readfile readfile = new readfile(); readfile.setreaderlistener(processpoidatalisteners); readfile.setencode(processpoidatalisteners.getencode()); // readfile.addobserver(); try { readfile.readfilebyline(filepath, start, end + 1); } catch (exception e) { e.printstacktrace(); } } }
具体业务逻辑监听
/** * created with okey * user: okey * date: 13-3-14 * time: 下午3:19 * nio逐行读数据回调方法 */ public abstract class readerfilelistener { // 一次读取行数,默认为500 private int readcolnum = 500; private string encode; private list<string> list = new arraylist<string>(); /** * 设置一次读取行数 * @param readcolnum */ protected void setreadcolnum(int readcolnum) { this.readcolnum = readcolnum; } public string getencode() { return encode; } public void setencode(string encode) { this.encode = encode; } /** * 每读取到一行数据,添加到缓存中 * @param linestr 读取到的数据 * @param linenum 行号 * @param over 是否读取完成 * @throws exception */ public void outline(string linestr, long linenum, boolean over) throws exception { if(null != linestr) list.add(linestr); if (!over && (linenum % readcolnum == 0)) { output(list); list.clear(); } else if (over) { output(list); list.clear(); } } /** * 批量输出 * * @param stringlist * @throws exception */ public abstract void output(list<string> stringlist) throws exception; }
线程调度
import java.io.file; import java.io.fileinputstream; import java.io.ioexception; /** * created with intellij idea. * user: okey * date: 14-4-1 * time: 下午6:03 * to change this template use file | settings | file templates. */ public class builddata { public static void main(string[] args) throws exception { file file = new file("e:\\1396341974289.csv"); fileinputstream fis = null; try { readfile readfile = new readfile(); fis = new fileinputstream(file); int available = fis.available(); int maxthreadnum = 50; // 线程粗略开始位置 int i = available / maxthreadnum; for (int j = 0; j < maxthreadnum; j++) { // 计算精确开始位置 long startnum = j == 0 ? 0 : readfile.getstartnum(file, i * j); long endnum = j + 1 < maxthreadnum ? readfile.getstartnum(file, i * (j + 1)) : -2; // 具体监听实现 processdatabypostgislisteners listeners = new processdatabypostgislisteners("gbk"); new readfilethread(listeners, startnum, endnum, file.getpath()).start(); } } catch (ioexception e) { e.printstacktrace(); } catch (exception e) { e.printstacktrace(); } } }
现在就可以尽情的调整 maxthreadnum来享受风一般的速度吧!
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。