Java多线程实现快速切分文件的程序
程序员文章站
2024-03-12 17:38:14
前段时间需要进行大批量数据导入,dba给提供的是cvs文件,但是每个cvs文件都好几个gb大小,直接进行load,数据库很慢还会产生内存不足的问题,为了实现这个功能,写了个...
前段时间需要进行大批量数据导入,dba给提供的是cvs文件,但是每个cvs文件都好几个gb大小,直接进行load,数据库很慢还会产生内存不足的问题,为了实现这个功能,写了个快速切分文件的程序。
import org.apache.log4j.logmanager; import org.apache.log4j.logger; import java.io.*; import java.util.*; import java.util.concurrent.*; public class filesplitutil { private final static logger log = logmanager.getlogger(filesplitutil.class); private static final long originfilesize = 1024 * 1024 * 100;// 100m private static final int blockfilesize = 1024 * 1024 * 64;// 防止中文乱码,必须取2的n次方 /** * cvs文件分隔符 */ private static final char cvsseparator = '^'; public static void main(string args[]){ long start = system.currenttimemillis(); try { string filename = "d:\\csvtest\\aa.csv"; file sourcefile = new file(filename); if (sourcefile.length() >= originfilesize) { string cvsfilename = filename.replaceall("\\\\", "/"); filesplitutil filesplitutil = new filesplitutil(); list<string> parts=filesplitutil.splitbysize(cvsfilename, blockfilesize); for(string part:parts){ system.out.println("partname is:"+part); } } system.out.println("总文件长度"+sourcefile.length()+",拆分文件耗时:" + (system.currenttimemillis() - start) + "ms."); }catch (exception e){ log.info(e.getstacktrace()); } } /** * 拆分文件 * * @param filename 待拆分的完整文件名 * @param bytesize 按多少字节大小拆分 * @return 拆分后的文件名列表 */ public list<string> splitbysize(string filename, int bytesize) throws ioexception, interruptedexception { list<string> parts = new arraylist<string>(); file file = new file(filename); int count = (int) math.ceil(file.length() / (double) bytesize); int countlen = (count + "").length(); randomaccessfile raf = new randomaccessfile(filename, "r"); long totallen = raf.length(); countdownlatch latch = new countdownlatch(count); for (int i = 0; i < count; i++) { string partfilename = file.getpath() + "." + leftpad((i + 1) + "", countlen, '0') + ".cvs"; int readsize=bytesize; long startpos=(long)i * bytesize; long nextpos=(long)(i+1) * bytesize; if(nextpos>totallen){ readsize= (int) (totallen-startpos); } new splitrunnable(readsize, startpos, partfilename, file, latch).run(); parts.add(partfilename); } latch.await();//等待所有文件写完 //由于切割时可能会导致行被切断,加工所有的的分割文件,合并行 mergerow(parts); return parts; } /** * 分割处理runnable * * @author supeidong */ private class splitrunnable implements runnable { int bytesize; string partfilename; file originfile; long startpos; countdownlatch latch; public splitrunnable(int bytesize, long startpos, string partfilename, file originfile, countdownlatch latch) { this.startpos = startpos; this.bytesize = bytesize; this.partfilename = partfilename; this.originfile = originfile; this.latch = latch; } public void run() { randomaccessfile rfile; outputstream os; try { rfile = new randomaccessfile(originfile, "r"); byte[] b = new byte[bytesize]; rfile.seek(startpos);// 移动指针到每“段”开头 int s = rfile.read(b); os = new fileoutputstream(partfilename); os.write(b, 0, s); os.flush(); os.close(); latch.countdown(); } catch (ioexception e) { log.error(e.getmessage()); latch.countdown(); } } } /** * 合并被切断的行 * * @param parts */ private void mergerow(list<string> parts) { list<partfile> partfiles = new arraylist<partfile>(); try { //组装被切分表对象 for (int i=0;i<parts.size();i++) { string partfilename=parts.get(i); file splitfiletemp = new file(partfilename); if (splitfiletemp.exists()) { partfile partfile = new partfile(); bufferedreader reader=new bufferedreader(new inputstreamreader(new fileinputstream(splitfiletemp),"gbk")); string firstrow = reader.readline(); string secondrow = reader.readline(); string endrow = readlastline(partfilename); partfile.setpartfilename(partfilename); partfile.setfirstrow(firstrow); partfile.setendrow(endrow); if(i>=1){ string prepartfile=parts.get(i - 1); string preendrow = readlastline(prepartfile); partfile.setfirstisfull(getcharcount(firstrow+preendrow)>getcharcount(secondrow)); } partfiles.add(partfile); reader.close(); } } //进行需要合并的行的写入 for (int i = 0; i < partfiles.size() - 1; i++) { partfile partfile = partfiles.get(i); partfile partfilenext = partfiles.get(i + 1); stringbuilder sb = new stringbuilder(); if (partfilenext.getfirstisfull()) { sb.append("\r\n"); sb.append(partfilenext.getfirstrow()); } else { sb.append(partfilenext.getfirstrow()); } writelastline(partfile.getpartfilename(),sb.tostring()); } } catch (exception e) { log.error(e.getmessage()); } } /** * 得到某个字符出现的次数 * @param s * @return */ private int getcharcount(string s) { int count = 0; for (int i = 0; i < s.length(); i++) { if (s.charat(i) == cvsseparator) { count++; } } return count; } /** * 采用bufferedinputstream方式读取文件行数 * * @param filename * @return */ public int getfilerow(string filename) throws ioexception { inputstream is = new bufferedinputstream(new fileinputstream(filename)); byte[] c = new byte[1024]; int count = 0; int readchars = 0; while ((readchars = is.read(c)) != -1) { for (int i = 0; i < readchars; ++i) { if (c[i] == '\n') ++count; } } is.close(); return count; } /** * 读取最后一行数据 * @param filename * @return * @throws ioexception */ private string readlastline(string filename) throws ioexception { // 使用randomaccessfile , 从后找最后一行数据 randomaccessfile raf = new randomaccessfile(filename, "r"); long len = raf.length(); string lastline = ""; if(len!=0l) { long pos = len - 1; while (pos > 0) { pos--; raf.seek(pos); if (raf.readbyte() == '\n') { lastline = raf.readline(); lastline=new string(lastline.getbytes("8859_1"), "gbk"); break; } } } raf.close(); return lastline; } /** * 修改最后一行数据 * @param filename * @param laststring * @return * @throws ioexception */ private void writelastline(string filename,string laststring){ try { // 打开一个随机访问文件流,按读写方式 randomaccessfile randomfile = new randomaccessfile(filename, "rw"); // 文件长度,字节数 long filelength = randomfile.length(); //将写文件指针移到文件尾。 randomfile.seek(filelength); //此处必须加gbk,否则会出现写入乱码 randomfile.write(laststring.getbytes("gbk")); randomfile.close(); } catch (ioexception e) { log.error(e.getmessage()); } } /** * 左填充 * * @param str * @param length * @param ch * @return */ public static string leftpad(string str, int length, char ch) { if (str.length() >= length) { return str; } char[] chs = new char[length]; arrays.fill(chs, ch); char[] src = str.tochararray(); system.arraycopy(src, 0, chs, length - src.length, src.length); return new string(chs); } /** * 合并文件行内部类 */ class partfile { private string partfilename; private string firstrow; private string endrow; private boolean firstisfull; public string getpartfilename() { return partfilename; } public void setpartfilename(string partfilename) { this.partfilename = partfilename; } public string getfirstrow() { return firstrow; } public void setfirstrow(string firstrow) { this.firstrow = firstrow; } public string getendrow() { return endrow; } public void setendrow(string endrow) { this.endrow = endrow; } public boolean getfirstisfull() { return firstisfull; } public void setfirstisfull(boolean firstisfull) { this.firstisfull = firstisfull; } } }
以上就是本文的全部内容,希望对大家学习java程序设计有所帮助。
上一篇: thinkPHP显示不出验证码的原因与解决方法分析
下一篇: Yii框架分页实现方法详解