Golang实现异步上传文件支持进度条查询的方法
程序员文章站
2022-05-14 15:13:33
业务背景
业务需求要求开发一个异步上传文件的接口,并支持上传进度的查询。
需求分析
zip压缩包中,包含一个csv文件和一个图片文件夹,要求:解析csv数据存入mongo...
业务背景
业务需求要求开发一个异步上传文件的接口,并支持上传进度的查询。
需求分析
zip压缩包中,包含一个csv文件和一个图片文件夹,要求:解析csv数据存入mongo,将图片文件夹中的图片信息对应上csv中的人员信息。
zip压缩包解压
使用golang自带的 "archive/zip" 包解压。
func decompresszip(filepath, dest string) (string, string, error) { var csvname string imagefolder := path.base(filepath) ext := path.ext(filepath) foldername := strings.trimsuffix(imagefolder, ext) src, err := os.open(filepath) if err != nil { return "", "", err } defer src.close() zipfile, err := zip.openreader(src.name()) if err != nil { return "", "", err } defer zipfile.close() err = os.mkdirall(path.join(dest, foldername), os.modeperm) for _, innerfile := range zipfile.file { info := innerfile.fileinfo() if info.isdir() { continue } dst, err := os.create(path.join(dest, foldername, info.name())) if err != nil { fmt.println(err.error()) continue } src, err := innerfile.open() if err != nil { fmt.println(err.error()) continue } io.copy(dst, src) } destpath, err := ioutil.readdir(path.join(dest, foldername)) if err != nil { return "", "", err } for _, v := range destpath { if path.ext(v.name()) == ".csv" { csvname = path.join(dest, foldername, v.name()) } } return foldername, csvname, nil }
在这个解压的过程中,压缩包的树结构只能到2层
import.zip ┝┅┅import.csv ┖┅┅images
在解压后,所有的文件都会在同一个目录下,既images中的图片会变成和.csv文件同级
验证csv文件编码格式是否为utf-8
func validutf8(buf []byte) bool { nbytes := 0 for i := 0; i < len(buf); i++ { if nbytes == 0 { if (buf[i] & 0x80) != 0 { //与操作之后不为0,说明首位为1 for (buf[i] & 0x80) != 0 { buf[i] <<= 1 //左移一位 nbytes++ //记录字符共占几个字节 } if nbytes < 2 || nbytes > 6 { //因为utf8编码单字符最多不超过6个字节 return false } nbytes-- //减掉首字节的一个计数 } } else { //处理多字节字符 if buf[i]&0xc0 != 0x80 { //判断多字节后面的字节是否是10开头 return false } nbytes-- } } return nbytes == 0 }
后续支持utf-8转码
这个utf8编码判断方法是网上down下来的,后续优化一下
主逻辑
type linewrong struct { linenumber int64 `json:"line_number"` msg string `json:"msg"` } func import(/*自定义参数*/){ // decompress zip file to destination address folder, csvname, err := decompress(path.join(constant.folderprefix, req.filepath), dest) if err != nil { fmt.println(err.error()) } // check if the file encoding is utf8 b, err := ioutil.readfile(csvname) if err != nil { fmt.println(err.error()) } if !utils.validutf8(b) { fmt.println(errors.new("数据编码错误,请使用utf-8格式csv!")) } // create goroutine to analysis data into mongodb var wg sync.waitgroup wg.add(1) // used to interrupt goroutine resultchan := make(chan error) // used to record wrong row in csv lw := make(chan []linewrong) go func(ctx *gin.context, name, csvpath, dir, folder string) { defer wg.done() tidt, cit, lwt, err := importcsv(ctx, name, csvpath, dir, folder) resultchan <- err if err != nil { fmt.println(err.error()) } lw <- lwt if len(lwt) == 0 { importclassdata(ctx, tidt, cit) } }(ctx, req.name, csvname, dest, folder) err = <-resultchan linewrong := <-lw close(lw) ··· } // pre-analysis data in csv and through wrong data with line numbers and information func importcsv()(){ ··· } // analysis data again and save data into mongodb, if is there any error,through them same as import() func importclassdata()(){ ··· conn, err := connect() if err != nil { return err } defer conn.close() conn.do("hset", taskid, "task_id", (curline*100)/totallines) ··· }
将错误信息以channel接收,使用 package "sync" 的 sync.waitgroup 控制异步协程。在入库的过程中,将当前的进度存入redis。
查询进度接口
func queryimport()(){ conn, err := connect() if err != nil { return nil, err } defer conn.close() progress, _ := conn.do("hget", key, field) if pro, ok := progress.([]uint8); ok { ba := []byte{} for _, b := range pro { ba = append(ba, byte(b)) } progress,_ = strconv.atoi(string(ba)) } return progress }
从redis中取出来的数据是[]uint8类型数据,先断言,然后转类型返回。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。