欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

OSS分块上传实例

程序员文章站 2022-07-14 21:26:29
...

问题场景

项目需要和OSS交互,对大文件进行上传,在此记录一下。

环境

jdk8, idea,
pom.xml片段

<!-- https://mvnrepository.com/artifact/com.aliyun.oss/aliyun-sdk-oss -->
        <dependency>
            <groupId>com.aliyun.oss</groupId>
            <artifactId>aliyun-sdk-oss</artifactId>
            <version>3.3.0</version>
        </dependency>

        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>3.4.0</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-ram</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-sts</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-ecs</artifactId>
            <version>4.2.0</version>
        </dependency>

CODE

import com.aliyun.oss.*;
import com.aliyun.oss.model.*;
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.*;

/**
 * @author: youxingyang
 * @date: 2018/9/26 17:47
 */
@Slf4j
public final class OSSUtil {

    private String endpoint;
    private String accessKeyId;
    private String accessKeySecret;

    private static OSS client = null;

    private static String bucketName;
    private static String key;
    private static final int DEFAULT_PARALLELISM = 5;

    public OSSUtil(String endpoint, String accessKeyId, String accessKeySecret, String bucketName1, String key1) {
        if (OsUtil.isWindows()) {
            this.endpoint = endpoint.replace("-internal", "");
        } else {
            this.endpoint = endpoint;
        }
        this.accessKeyId = accessKeyId;
        this.accessKeySecret = accessKeySecret;
        bucketName = bucketName1;
        key = key1;
    }

    /**
     * 分片上传文件至OSS
     * @param localFilePath     本地文件
     * @return
     */
    public boolean multipartUploadFile(String localFilePath) throws ExecutionException, InterruptedException {
        boolean res = false;
        /*
         * Constructs a client instance with your account for accessing OSS
         */
        ClientBuilderConfiguration conf = new ClientBuilderConfiguration();
        conf.setIdleConnectionTime(1000);
        client = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret, conf);

        try {
            /*
             * Claim a upload id firstly
             */
            String uploadId = claimUploadId();
            log.info("Claiming a new upload id " + uploadId + "\n");

            /*
             * Calculate how many parts to be divided
             */
            // 10MB
            final long partSize = 5 * 1024 * 1024L;
            final File sampleFile = new File(localFilePath);
            long fileLength = sampleFile.length();
            int partCount = (int) (fileLength / partSize);
            if (fileLength % partSize != 0) {
                partCount++;
            }
            if (partCount > 10000) {
                throw new RuntimeException("Total parts count should not exceed 10000");
            } else {
                log.info("Total parts count " + partCount + "\n");
            }

            /*
             * Upload multiparts to your bucket
             */
            log.info("Begin to upload multiparts to OSS from a file\n");
            List<Future<PartETag>> tmpList = Collections.synchronizedList(new ArrayList<>(partCount));
            ExecutorService executorService = Executors.newFixedThreadPool(DEFAULT_PARALLELISM);
            for (int i = 0; i < partCount; i++) {
                long startPos = i * partSize;
                long curPartSize = (i + 1 == partCount) ? (fileLength - startPos) : partSize;
                tmpList.add(executorService.submit(new PartUploader(sampleFile, startPos, curPartSize, i + 1, uploadId)));
            }

            /*
             * Waiting for all parts finished
             */
            executorService.shutdown();
            while (!executorService.isTerminated()) {
                try {
                    //executorService.awaitTermination(5, TimeUnit.SECONDS);
                    executorService.awaitTermination(1, TimeUnit.DAYS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            List<PartETag> partETags = Collections.synchronizedList(new ArrayList<PartETag>());
            for (Future<PartETag> aTmpList : tmpList) {
                if (aTmpList.isDone()) {
                    partETags.add(aTmpList.get());
                }
            }

            /*
             * Verify whether all parts are finished
             */
            if (partETags.size() != partCount) {
                throw new IllegalStateException("Upload multiparts fail due to some parts are not finished yet");
            } else {
                log.info("Succeed to complete multiparts into an object named " + key + "\n");
                res = true;
            }

            /*
             * View all parts uploaded recently
             */
            listAllParts(uploadId);

            /*
             * Complete to upload multiparts
             */
            completeMultipartUpload(uploadId, partETags);

            /*
             * Fetch the object that newly created at the step below.
             */
            log.info("Fetching an object");
            client.getObject(new GetObjectRequest(bucketName, key), new File(localFilePath));
            res = true;
        } catch (OSSException oe) {
            log.info("Caught an OSSException, which means your request made it to OSS, "
                    + "but was rejected with an error response for some reason.");
            log.info("Error Message: " + oe.getErrorCode());
            log.info("Error Code:       " + oe.getErrorCode());
            log.info("Request ID:      " + oe.getRequestId());
            log.info("Host ID:           " + oe.getHostId());
        } catch (ClientException ce) {
            log.info("Caught an ClientException, which means the client encountered "
                    + "a serious internal problem while trying to communicate with OSS, "
                    + "such as not being able to access the network.");
            log.info("Error Message: " + ce.getMessage());
        } finally {
            /*
             * Do not forget to shut down the client finally to release all allocated resources.
             */
            if (client != null) {
                client.shutdown();
            }
        }
        return res;
    }

    private static class PartUploader implements Callable<PartETag> {

        private File localFile;
        private long startPos;

        private long partSize;
        private int partNumber;
        private String uploadId;

        PartUploader(File localFile, long startPos, long partSize, int partNumber, String uploadId) {
            this.localFile = localFile;
            this.startPos = startPos;
            this.partSize = partSize;
            this.partNumber = partNumber;
            this.uploadId = uploadId;
        }

        public PartETag call() {
            PartETag partETag = null;
            InputStream instream = null;
            try {
                instream = new FileInputStream(this.localFile);
                instream.skip(this.startPos);

                UploadPartRequest uploadPartRequest = new UploadPartRequest();
                uploadPartRequest.setBucketName(bucketName);
                uploadPartRequest.setKey(key);
                uploadPartRequest.setUploadId(this.uploadId);
                uploadPartRequest.setInputStream(instream);
                uploadPartRequest.setPartSize(this.partSize);
                uploadPartRequest.setPartNumber(this.partNumber);

                UploadPartResult uploadPartResult = client.uploadPart(uploadPartRequest);
                log.info("Part#" + this.partNumber + " done\n");
                partETag = uploadPartResult.getPartETag();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (instream != null) {
                    try {
                        instream.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
            return partETag;
        }
    }

    private static String claimUploadId() {
        InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, key);
        InitiateMultipartUploadResult result = client.initiateMultipartUpload(request);
        return result.getUploadId();
    }

    private static void completeMultipartUpload(String uploadId, List<PartETag> partETags) {
        // Make part numbers in ascending order
        partETags.sort(Comparator.comparingInt(PartETag::getPartNumber));

        log.info("Completing to upload multiparts\n");
        CompleteMultipartUploadRequest completeMultipartUploadRequest =
                new CompleteMultipartUploadRequest(bucketName, key, uploadId, partETags);
        client.completeMultipartUpload(completeMultipartUploadRequest);
    }

    private static void listAllParts(String uploadId) {
        log.info("Listing all parts......");
        ListPartsRequest listPartsRequest = new ListPartsRequest(bucketName, key, uploadId);
        PartListing partListing = client.listParts(listPartsRequest);

        int partCount = partListing.getParts().size();
        for (int i = 0; i < partCount; i++) {
            PartSummary partSummary = partListing.getParts().get(i);
            log.info("\tPart#" + partSummary.getPartNumber() + ", ETag=" + partSummary.getETag());
        }
        System.out.println();
    }

}

public static void main(String[] args) {
        // 配置文件自己写
        String bucketName = PropertiesUtil.readProperties("config.properties", "bucketName");
        String endpoint = PropertiesUtil.readProperties("config.properties", "endpoint");
        String accessKeyId = PropertiesUtil.readProperties("config.properties", "accessKeyId");
        String accessKeySecret = PropertiesUtil.readProperties("config.properties", "accessKeySecret");
        // 要传的key
        String key = "abc/testMul.txt";
        OSSUtil ossUtil = new OSSUtil(endpoint, accessKeyId, accessKeySecret, bucketName, key);
        boolean success = ossUtil.multipartUploadFile(zipName);
        if (success) {
            log.info("上传 {} 成功", zipName);
        } else {
            log.info("上传 {} 失败", zipName);
        }
    }
相关标签: OSS