欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java nio导入csv对账文件 多线程java

程序员文章站 2022-07-12 19:51:42
...

公司业务上传支付宝、微信交易记录,并和系统进行对账 功能

个人使用了,java 的nio  和 多线程进行扫描文件 并装载bean对象,具体代码如下:

附件有对应的工具类,一直上传失败,放到百度云盘,有需要的可以取下载:链接: https://pan.baidu.com/s/1y-I36iUAQbx2_Ss1Ih8ASQ 提取码: meng 

 

@RequestMapping(value = "checkReconciliationData/{channel}", method = RequestMethod.POST)
public CResponse checkReconciliationDataNew(@RequestParam("file") MultipartFile file, @PathVariable String channel)throws IOException {
    if (file != null && !file.isEmpty()) {
        String fileName = file.getOriginalFilename();
        UploadPayDataRequest uploadPayDataRequest = new UploadPayDataRequest();
        uploadPayDataRequest.setFileName(fileName);
        List<UploadPayDataEntity> uploadPayDataEntities = uploadPayDataService.selectInfoByList(uploadPayDataRequest);
        if (uploadPayDataEntities.size() > 0) {
            return CResponse.error(ResultCodeEnum.ERROR_DATA_FORMAT, "文件已存在,无需重复上传!");
        }
        String keyName = MD5.MD5Encode(fileName);
        System.out.println("keyName:" + keyName);
        String keyValue = jedisCluster.get(keyName);
        if (keyValue == null) {
            jedisCluster.set(keyName, "PROCESSING");//文件处理中
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            UploadPayDataDto uploadPayDataDto = new UploadPayDataDto();
            executorService.execute(() -> {
                try {
                    CommonsMultipartFile cf = (CommonsMultipartFile) file;
                    DiskFileItem fi = (DiskFileItem) cf.getFileItem();
                    File f = fi.getStoreLocation();
                    BigFileReader.Builder builder = new BigFileReader.Builder(f, line -> {
                        if (line.indexOf("渠道") == -1) {
                            String[] strs = line.split(",");
                            String json = JSONObject.toJSONString(setItem(strs, channel, fileName));
                            jedisCluster.lpush(fileName, json);
                        }
                    });
                    BigFileReader bigFileReader = builder
                            .threadPoolSize(10)
                            .charset(Charset.forName("GBK"))
                            .bufferSize(1024).build();
                    bigFileReader.start(jedisCluster, keyName);
                    upload(file);
                } catch (Exception e) {
                    log.error("异步操作文件内容失败,原因:" + e.getMessage());
                    jedisCluster.del(keyName);
                }
            });
            BufferedInputStream bis = new BufferedInputStream(file.getInputStream());
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(bis, "GBK"), 30 * 1024 * 1024);//10M缓存
            uploadPayDataDto.setTotalNum(bufferedReader.lines().count());
            uploadPayDataDto.setUrl(ALIYUN_IMAGE_URL + fileName);
            uploadPayDataDto.setFileName(fileName);
            return CResponse.success(uploadPayDataDto);
        } else {
            return CResponse.error(ResultCodeEnum.ERROR_DATA_FORMAT, "文件处理中");
        }
    } else {
        return CResponse.error(ResultCodeEnum.ERROR_DATA_FORMAT, "文件数据为空");
    }
}



public class BigFileReader {
    private int threadPoolSize;
    private Charset charset;
    private int bufferSize;
    private IFileHandle handle;
    private ExecutorService executorService;
    private long fileLength;
    private RandomAccessFile rAccessFile;
    private Set<StartEndPair> startEndPairs;
    private CyclicBarrier cyclicBarrier;
    private AtomicLong counter = new AtomicLong(0);


    public BigFileReader(File file, IFileHandle handle, Charset charset, int bufferSize, int threadPoolSize) {
        this.fileLength = file.length();
        this.handle = handle;
        this.charset = charset;
        this.bufferSize = bufferSize;
        this.threadPoolSize = threadPoolSize;
        try {
            this.rAccessFile = new RandomAccessFile(file, "r");
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
        this.executorService = Executors.newFixedThreadPool(threadPoolSize);
        startEndPairs = new HashSet<>();
    }

    public void start(JedisCluster jedisCluster, String fileName) {
        long everySize = this.fileLength / this.threadPoolSize;
        try {
            calculateStartEnd(0, everySize);
        } catch (IOException e) {
            e.printStackTrace();
            return;
        }

        final long startTime = System.currentTimeMillis();
        cyclicBarrier = new CyclicBarrier(startEndPairs.size(), () -> {
            System.out.println("use time: " + (System.currentTimeMillis() - startTime));
            System.out.println("all line: " + counter.get());
            System.out.println(fileName);
            jedisCluster.set(fileName, "SUCCESS");
            shutdown();
        });
        for (StartEndPair pair : startEndPairs) {
            System.out.println("分配分片:" + pair);
            this.executorService.execute(new SliceReaderTask(pair));
        }
    }

    private void calculateStartEnd(long start, long size) throws IOException {
        if (start > fileLength - 1) {
            return;
        }
        StartEndPair pair = new StartEndPair();
        pair.start = start;
        long endPosition = start + size - 1;
        if (endPosition >= fileLength - 1) {
            pair.end = fileLength - 1;
            startEndPairs.add(pair);
            return;
        }

        rAccessFile.seek(endPosition);
        byte tmp = (byte) rAccessFile.read();
        while (tmp != '\n' && tmp != '\r') {
            endPosition++;
            if (endPosition >= fileLength - 1) {
                endPosition = fileLength - 1;
                break;
            }
            rAccessFile.seek(endPosition);
            tmp = (byte) rAccessFile.read();
        }
        pair.end = endPosition;
        startEndPairs.add(pair);

        calculateStartEnd(endPosition + 1, size);

    }

    public void shutdown() {
        try {
            this.rAccessFile.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        this.executorService.shutdown();
    }

    private void handle(byte[] bytes) throws UnsupportedEncodingException {
        String line = null;
        if (this.charset == null) {
            line = new String(bytes);
        } else {
            line = new String(bytes, charset);
        }
        if (line != null && !"".equals(line)) {
            this.handle.handle(line);
            counter.incrementAndGet();
        }
    }


    private static class StartEndPair {
        public long start;
        public long end;

        @Override
        public String toString() {
            return "star=" + start + ";end=" + end;
        }
    }

    private class SliceReaderTask implements Runnable {
        private long start;
        private long sliceSize;
        private byte[] readBuff;

        public SliceReaderTask(StartEndPair pair) {
            this.start = pair.start;
            this.sliceSize = pair.end - pair.start + 1;
            this.readBuff = new byte[bufferSize];
        }


        @Override
        public void run() {
            try {
                MappedByteBuffer mapBuffer = rAccessFile.getChannel().map(FileChannel.MapMode.READ_ONLY, start, this.sliceSize);
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                for (int offset = 0; offset < sliceSize; offset += bufferSize) {
                    int readLength;
                    if (offset + bufferSize <= sliceSize) {
                        readLength = bufferSize;
                    } else {
                        readLength = (int) (sliceSize - offset);
                    }
                    mapBuffer.get(readBuff, 0, readLength);
                    for (int i = 0; i < readLength; i++) {
                        byte tmp = readBuff[i];
                        //碰到换行符
                        if (tmp == '\n' || tmp == '\r') {
                            handle(bos.toByteArray());
                            bos.reset();
                        } else {
                            bos.write(tmp);
                        }
                    }
                }
                if (bos.size() > 0) {
                    handle(bos.toByteArray());
                }
                cyclicBarrier.await();//测试性能用
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static class Builder {
        private int threadSize = 1;
        private Charset charset;
        private int bufferSize = 1024 * 1024;
        private IFileHandle handle;
        private File file;

        public Builder(File file, IFileHandle handle) {
            this.file = file;
            if (!this.file.exists())
                throw new IllegalArgumentException("文件不存在!");
            this.handle = handle;
        }

        public Builder threadPoolSize(int size) {
            this.threadSize = size;
            return this;
        }

        public Builder charset(Charset charset) {
            this.charset = charset;
            return this;
        }

        public Builder bufferSize(int bufferSize) {
            this.bufferSize = bufferSize;
            return this;
        }

        public BigFileReader build() {
            return new BigFileReader(this.file, this.handle, this.charset, this.bufferSize, this.threadSize);
        }
    }

}

public interface IFileHandle {
    void handle(String line);
}
相关标签: 多线程 java