欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

java逐行读取超大文件

程序员文章站 2022-04-24 11:32:59
...

之前写了一个读取超大json文件的封装,但是有些情况需要逐行读取文件,这里提供逐行读取超大文件的封装,读取大json文件请点击传送门

代码中写了详细的注释这里就不详细介绍了直接上代码(代码为参考https://www.cnblogs.com/metoy/p/4470418.html,代码上略有改动)

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel.MapMode;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

public class BigFileReader {
    private int threadSize;
    private String charset;
    private int bufferSize;
    private IHandle handle;
    private ExecutorService executorService;
    private long fileLength;
    private RandomAccessFile rAccessFile;
    private Set<StartEndPair> startEndPairs;
    private CyclicBarrier cyclicBarrier;
    private AtomicLong counter = new AtomicLong(0);

    private BigFileReader(File file, IHandle handle, String charset, int bufferSize, int threadSize) {
        this.fileLength = file.length();
        this.handle = handle;
        this.charset = charset;
        this.bufferSize = bufferSize;
        this.threadSize = threadSize;
        try {
            this.rAccessFile = new RandomAccessFile(file, "r");
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
        this.executorService = Executors.newFixedThreadPool(threadSize);
        this.startEndPairs = new HashSet<BigFileReader.StartEndPair>();
    }

    /**
     * 开始读取数据
     */
    public void start() {
        long everySize = this.fileLength / this.threadSize;
        try {
            // 递归分片
            calculateStartEnd(0, everySize);
        } catch (IOException e) {
            e.printStackTrace();
            return;
        }

        final long startTime = System.currentTimeMillis();
        cyclicBarrier = new CyclicBarrier(startEndPairs.size(), new Runnable() {
            @Override
            public void run() {
                System.out.println("use time: " + (System.currentTimeMillis() - startTime));
                System.out.println("all line: " + counter.get());
                shutdown();
            }
        });

        for (StartEndPair pair : startEndPairs) {
            System.out.println("分配分片:" + pair);
            this.executorService.execute(new SliceReaderTask(pair));
        }
    }


    /**
     * 递归函数分片
     *
     * @param start
     * @param size
     * @throws IOException
     */
    private void calculateStartEnd(long start, long size) throws IOException {
        if (start > fileLength - 1) {
            return;
        }
        StartEndPair pair = new StartEndPair();
        pair.start = start;
        long endPosition = start + size - 1;
        // 最后一个分片
        if (endPosition >= fileLength - 1) {
            pair.end = fileLength - 1;
            startEndPairs.add(pair);
            return;
        }
        // 还有其他没有分片的数据,将指针移动到上一个分片的末尾
        rAccessFile.seek(endPosition);
        /**
         *  由于分割的地方可能在一行数据中间所以
         *  将结束位置一直移动到这一行完
         */
        byte tmp = (byte) rAccessFile.read();
        while (tmp != '\n' && tmp != '\r') {
            endPosition++;
            if (endPosition >= fileLength - 1) {
                endPosition = fileLength - 1;
                break;
            }
            rAccessFile.seek(endPosition);
            tmp = (byte) rAccessFile.read();
        }
        pair.end = endPosition;
        startEndPairs.add(pair);
        // 递归分片
        calculateStartEnd(endPosition + 1, size);
    }

    private void shutdown() {
        try {
            this.rAccessFile.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        this.executorService.shutdown();
    }

    /**
     *  将每一行的数据转成指定格式的字符串
     * @param bytes
     * @throws UnsupportedEncodingException
     */
    private void handle(byte[] bytes) throws UnsupportedEncodingException {
        String line = null;
        if (this.charset == null) {
            line = new String(bytes);
        } else {
            line = new String(bytes, charset);
        }
        if (line != null && !"".equals(line)) {
            this.handle.handle(line);
            counter.incrementAndGet();
        }
    }

    /**
     * 每个分片的片段
     */
    private static class StartEndPair {
        // 片段的起始位置
        public long start;
        // 片段的结束位置
        public long end;

        @Override
        public String toString() {
            return "star=" + start + ";end=" + end;
        }

        @Override
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result + (int) (end ^ (end >>> 32));
            result = prime * result + (int) (start ^ (start >>> 32));
            return result;
        }

        /**
         * 判断是否为同一片段
         *
         * @param obj
         * @return
         */
        @Override
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            StartEndPair other = (StartEndPair) obj;
            if (end != other.end)
                return false;
            if (start != other.start)
                return false;
            return true;
        }
    }


    /**
     *  读取一个分片的数据线程
     */
    private class SliceReaderTask implements Runnable {
        private long start;
        private long sliceSize;
        private byte[] readBuff;

        public SliceReaderTask(StartEndPair pair) {
            this.start = pair.start;
            this.sliceSize = pair.end - pair.start + 1;
            this.readBuff = new byte[bufferSize];
        }

        @Override
        public void run() {
            try {
                MappedByteBuffer mapBuffer = rAccessFile.getChannel().map(MapMode.READ_ONLY, start, this.sliceSize);
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                for (int offset = 0; offset < sliceSize; offset += bufferSize) {
                    int readLength;
                    if (offset + bufferSize <= sliceSize) {
                        readLength = bufferSize;
                    } else {
                        readLength = (int) (sliceSize - offset);
                    }
                    mapBuffer.get(readBuff, 0, readLength);
                    for (int i = 0; i < readLength; i++) {
                        byte tmp = readBuff[i];
                        if (tmp == '\n' || tmp == '\r') {
                            handle(bos.toByteArray());
                            bos.reset();
                        } else {
                            bos.write(tmp);
                        }
                    }
                }
                if (bos.size() > 0) {
                    handle(bos.toByteArray());
                }
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    /**
     *  构建BigFileReader类
     */
    public static class Builder {
        private int threadSize = 1;
        private String charset = null;
        private int bufferSize = 1024 * 1024;
        private IHandle handle;
        private File file;

        public Builder(String file, IHandle handle) {
            this.file = new File(file);
            if (!this.file.exists())
                throw new IllegalArgumentException("文件不存在!");
            this.handle = handle;
        }

        public Builder withTreahdSize(int size) {
            this.threadSize = size;
            return this;
        }

        public Builder withCharset(String charset) {
            this.charset = charset;
            return this;
        }

        public Builder withBufferSize(int bufferSize) {
            this.bufferSize = bufferSize;
            return this;
        }

        public BigFileReader build() {
            return new BigFileReader(this.file, this.handle, this.charset, this.bufferSize, this.threadSize);
        }
    }


}
/**
 *  读取的每一行数据进行处理接口
 */
public interface IHandle {

	public void handle(String line);
}

测试代码:

	public static void main(String[] args) {
		BigFileReader.Builder builder = new BigFileReader.Builder("d:/bigFile.txt",new IHandle() {
			@Override
			public void handle(String line) {
				System.out.println(line);
			}
		});
		builder.withTreahdSize(10)
			   .withCharset("gbk")
			   .withBufferSize(1024*1024);
		BigFileReader bigFileReader = builder.build();
                // 读取完文件会关闭流
		bigFileReader.start();
	}

 

 

相关标签: java