Springboot项目redisTemplate实现轻量级消息队列
程序员文章站
2023-11-22 23:50:16
背景 公司项目有个需求, 前端上传excel文件, 后端读取数据、处理数据、返回错误数据, 最简单的方式同步处理, 客户端上传文件后一直阻塞等待响应, 但用户体验无疑很差, 处理数据可能十分耗时, 没人愿意傻等, 由于项目暂未使用ActiveMQ等消息队列中间件, 而redis的lpush和rpop ......
背景
公司项目有个需求, 前端上传excel文件, 后端读取数据、处理数据、返回错误数据, 最简单的方式同步处理, 客户端上传文件后一直阻塞等待响应, 但用户体验无疑很差, 处理数据可能十分耗时, 没人愿意傻等, 由于项目暂未使用activemq等消息队列中间件, 而redis的lpush和rpop很适合作为一种轻量级的消息队列实现, 所以用它完成此次功能开发
一、本文涉及知识点
- excel文件读写--阿里easyexcel sdk
- 文件上传、下载--腾讯云对象存储
- 远程服务调用--resttemplate
- 生产者、消费者--redistemplate leftpush和rightpop操作
- 异步处理数据--executors线程池
- 读取网络文件流--httpclient
- 自定义注解实现用户身份认证--jwt token认证, 拦截器拦截标注有@loginrequired注解的请求入口
当然, java实现咯
涉及的知识点比较多, 每一个知识点都可以作为专题进行学习分析, 本文将完整实现呈现出来, 后期拆分与小伙伴分享学习
二、项目目录结构
说明: 数据库dao层放到另一个模块了, 不是本文重点
三、主要maven依赖
- easyexcel
<easyexcel-latestversion>1.1.2-beta4</easyexcel-latestversion> <dependency> <groupid>com.alibaba</groupid> <artifactid>easyexcel</artifactid> <version>${easyexcel-latestversion}</version> </dependency>
- jwt
<dependency> <groupid>io.jsonwebtoken</groupid> <artifactid>jjwt</artifactid> <version>0.7.0</version> </dependency>
- redis
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-redis</artifactid> <version>1.3.5.release</version> </dependency>
- 腾讯cos
<dependency> <groupid>com.qcloud</groupid> <artifactid>cos_api</artifactid> <version>5.4.5</version> </dependency>
四、流程
- 用户上传文件
- 将文件存储到腾讯cos
- 将上传后的文件id及上传记录保存到数据库
- redis生产一条导入消息, 即保存文件id到redis
- 请求结束, 返回"处理中"状态
- redis消费消息
- 读取cos文件, 异步处理数据
- 将错误数据以excel形式上传至cos, 以供用户下载, 并更新处理状态为"处理完成"
- 客户端轮询查询处理状态, 并可以下载错误文件
- 结束
五、实现效果
上传文件
数据库导入记录
导入的数据
下载错误文件
错误数据提示
查询导入记录
六、代码实现
1、导入excel控制层
@loginrequired @requestmapping(value = "doimport", method = requestmethod.post) public jsonresponse doimport(@requestparam("file") multipartfile file, httpservletrequest request) { pluser user = getuser(request); return orderimportservice.doimport(file, user.getid()); }
2、service层
@override public jsonresponse doimport(multipartfile file, integer userid) { if (null == file || file.isempty()) { throw new serviceexception("文件不能为空"); } string filename = file.getoriginalfilename(); if (!checkfilesuffix(filename)) { throw new serviceexception("当前仅支持xlsx格式的excel"); } // 存储文件 string fileid = savetooss(file); if (stringutils.isblank(fileid)) { throw new serviceexception("文件上传失败, 请稍后重试"); } // 保存记录到数据库 saverecordtodb(userid, fileid, filename); // 生产一条订单导入消息 redisproducer.produce(rediskey.orderimportkey, fileid); return jsonresponse.ok("导入成功, 处理中..."); } /** * 校验文件格式 * @param filename * @return */ private static boolean checkfilesuffix(string filename) { if (stringutils.isblank(filename) || filename.lastindexof(".") <= 0) { return false; } int pointindex = filename.lastindexof("."); string suffix = filename.substring(pointindex, filename.length()).tolowercase(); if (".xlsx".equals(suffix)) { return true; } return false; } /** * 将文件存储到腾讯oss * @param file * @return */ private string savetooss(multipartfile file) { inputstream ins = null; try { ins = file.getinputstream(); } catch (ioexception e) { e.printstacktrace(); } string fileid; try { string originalfilename = file.getoriginalfilename(); file f = new file(originalfilename); inputstreamtofile(ins, f); filesystemresource resource = new filesystemresource(f); multivaluemap<string, object> param = new linkedmultivaluemap<>(); param.add("file", resource); responseresult responseresult = resttemplate.postforobject(txossuploadurl, param, responseresult.class); fileid = (string) responseresult.getdata(); } catch (exception e) { fileid = null; } return fileid; }
3、redis生产者
@service public class redisproducerimpl implements redisproducer { @autowired private redistemplate redistemplate; @override public jsonresponse produce(string key, string msg) { map<string, string> map = maps.newhashmap(); map.put("fileid", msg); redistemplate.opsforlist().leftpush(key, map); return jsonresponse.ok(); } }
4、redis消费者
@service public class redisconsumer { @autowired public redistemplate redistemplate; @value("${txossfileurl}") private string txossfileurl; @value("${txossuploadurl}") private string txossuploadurl; @postconstruct public void init() { processorderimport(); } /** * 处理订单导入 */ private void processorderimport() { executorservice executorservice = executors.newcachedthreadpool(); executorservice.execute(() -> { while (true) { object object = redistemplate.opsforlist().rightpop(rediskey.orderimportkey, 1, timeunit.seconds); if (null == object) { continue; } string msg = json.tojsonstring(object); executorservice.execute(new orderimporttask(msg, txossfileurl, txossuploadurl)); } }); } }
5、处理任务线程类
public class orderimporttask implements runnable { public orderimporttask(string msg, string txossfileurl, string txossuploadurl) { this.msg = msg; this.txossfileurl = txossfileurl; this.txossuploadurl = txossuploadurl; } } /** * 注入bean */ private void autowirebean() { this.resttemplate = beancontext.getapplicationcontext().getbean(resttemplate.class); this.transactiontemplate = beancontext.getapplicationcontext().getbean(transactiontemplate.class); this.orderimportservice = beancontext.getapplicationcontext().getbean(orderimportservice.class); } @override public void run() { // 注入bean autowirebean(); jsonobject jsonobject = json.parseobject(msg); string fileid = jsonobject.getstring("fileid"); multivaluemap<string, object> param = new linkedmultivaluemap<>(); param.add("id", fileid); responseresult responseresult = resttemplate.postforobject(txossfileurl, param, responseresult.class); string fileurl = (string) responseresult.getdata(); if (stringutils.isblank(fileurl)) { return; } inputstream inputstream = httpclientutil.readfilefromurl(fileurl); list<object> list = excelutil.read(inputstream); process(list, fileid); } /** * 将文件上传至oss * @param file * @return */ private string savetooss(file file) { string fileid; try { filesystemresource resource = new filesystemresource(file); multivaluemap<string, object> param = new linkedmultivaluemap<>(); param.add("file", resource); responseresult responseresult = resttemplate.postforobject(txossuploadurl, param, responseresult.class); fileid = (string) responseresult.getdata(); } catch (exception e) { fileid = null; } return fileid; }
说明: 处理数据的业务逻辑代码就不用贴了
6、上传文件到cos
@requestmapping("/txossupload") @responsebody public responseresult txossupload(@requestparam("file") multipartfile file) throws unsupportedencodingexception { if (null == file || file.isempty()) { return responseresult.fail("文件不能为空"); } string originalfilename = file.getoriginalfilename(); originalfilename = mimeutility.decodetext(originalfilename);// 解决中文乱码问题 string contenttype = getcontenttype(originalfilename); string key; inputstream ins = null; file f = null; try { ins = file.getinputstream(); f = new file(originalfilename); inputstreamtofile(ins, f); key = ifilestorageclient.txossupload(new fileinputstream(f), originalfilename, contenttype); } catch (exception e) { return responseresult.fail(e.getmessage()); } finally { if (null != ins) { try { ins.close(); } catch (ioexception e) { e.printstacktrace(); } } if (f.exists()) {// 删除临时文件 f.delete(); } } return responseresult.ok(key); } public static void inputstreamtofile(inputstream ins,file file) { try { outputstream os = new fileoutputstream(file); int bytesread = 0; byte[] buffer = new byte[8192]; while ((bytesread = ins.read(buffer, 0, 8192)) != -1) { os.write(buffer, 0, bytesread); } os.close(); ins.close(); } catch (exception e) { e.printstacktrace(); } } public string txossupload(fileinputstream inputstream, string key, string contenttype) { key = uuid.getuuid() + "-" + key; ossutil.txossupload(inputstream, key, contenttype); try { if (null != inputstream) { inputstream.close(); } } catch (ioexception e) { e.printstacktrace(); } return key; } public static void txossupload(fileinputstream inputstream, string key, string contenttype) { objectmetadata objectmetadata = new objectmetadata(); try{ int length = inputstream.available(); objectmetadata.setcontentlength(length); }catch (exception e){ logger.info(e.getmessage()); } objectmetadata.setcontenttype(contenttype); cosclient.putobject(txbucketname, key, inputstream, objectmetadata); }
7、下载文件
/** * 腾讯云文件下载 * @param response * @param id * @return */ @requestmapping("/txossdownload") public object txossdownload(httpservletresponse response, string id) { cosobjectinputstream cosobjectinputstream = ifilestorageclient.txossdownload(id, response); string contenttype = getcontenttype(id); fileutil.txossdownload(response, contenttype, cosobjectinputstream, id); return null; } public static void txossdownload(httpservletresponse response, string contenttype, inputstream filestream, string filename) { fileoutputstream fos = null; response.reset(); outputstream os = null; try { response.setcontenttype(contenttype + "; charset=utf-8"); if(!contenttype.equals(plconstans.filecontenttype.image)){ try { response.setheader("content-disposition", "attachment; filename=" + new string(filename.getbytes("utf-8"), "iso8859-1")); } catch (unsupportedencodingexception e) { response.setheader("content-disposition", "attachment; filename=" + filename); logger.error("encoding file name failed", e); } } os = response.getoutputstream(); byte[] b = new byte[1024 * 1024]; int len; while ((len = filestream.read(b)) > 0) { os.write(b, 0, len); os.flush(); try { if(fos != null) { fos.write(b, 0, len); fos.flush(); } } catch (exception e) { logger.error(e.getmessage()); } } } catch (ioexception e) { ioutils.closequietly(fos); fos = null; } finally { ioutils.closequietly(os); ioutils.closequietly(filestream); if(fos != null) { ioutils.closequietly(fos); } } }
8、读取网络文件流
/** * 读取网络文件流 * @param url * @return */ public static inputstream readfilefromurl(string url) { if (stringutils.isblank(url)) { return null; } httpclient httpclient = new defaulthttpclient(); httpget methodget = new httpget(url); try { httpresponse response = httpclient.execute(methodget); if (response.getstatusline().getstatuscode() == 200) { httpentity entity = response.getentity(); return entity.getcontent(); } } catch (exception e) { e.printstacktrace(); } return null; }
9、excelutil
/** * 读excel * @param inputstream 文件输入流 * @return list集合 */ public static list<object> read(inputstream inputstream) { return easyexcelfactory.read(inputstream, new sheet(1, 1)); } /** * 写excel * @param data list数据 * @param clazz * @param savefilepath 文件保存路径 * @throws ioexception */ public static void write(list<? extends baserowmodel> data, class<? extends baserowmodel> clazz, string savefilepath) throws ioexception { file tempfile = new file(savefilepath); outputstream out = new fileoutputstream(tempfile); excelwriter writer = easyexcelfactory.getwriter(out); sheet sheet = new sheet(1, 3, clazz, "sheet1", null); writer.write(data, sheet); writer.finish(); out.close(); }
说明: 至此, 整个流程算是完整了, 下面将其他知识点代码也贴出来参考
七、其他
1、@loginrequired注解
/** * 在需要登录验证的controller的方法上使用此注解 */ @target({elementtype.method}) @retention(retentionpolicy.runtime) public @interface loginrequired { }
2、mycontrolleradvice
@controlleradvice public class mycontrolleradvice { @responsebody @exceptionhandler(tokenvalidationexception.class) public jsonresponse tokenvalidationexceptionhandler() { return jsonresponse.logininvalid(); } @responsebody @exceptionhandler(serviceexception.class) public jsonresponse serviceexceptionhandler(serviceexception se) { return jsonresponse.fail(se.getmsg()); } @responsebody @exceptionhandler(exception.class) public jsonresponse exceptionhandler(exception e) { e.printstacktrace(); return jsonresponse.fail(e.getmessage()); } }
3、authenticationinterceptor
public class authenticationinterceptor implements handlerinterceptor { private static final string current_user = "user"; @autowired private userservice userservice; @override public boolean prehandle(httpservletrequest request, httpservletresponse response, object handler) { // 如果不是映射到方法直接通过 if (!(handler instanceof handlermethod)) { return true; } handlermethod handlermethod = (handlermethod) handler; method method = handlermethod.getmethod(); // 判断接口是否有@loginrequired注解, 有则需要登录 loginrequired methodannotation = method.getannotation(loginrequired.class); if (methodannotation != null) { // 验证token integer userid = jwtutil.verifytoken(request); pluser pluser = userservice.selectbyprimarykey(userid); if (null == pluser) { throw new runtimeexception("用户不存在,请重新登录"); } request.setattribute(current_user, pluser); return true; } return true; } @override public void posthandle(httpservletrequest httpservletrequest, httpservletresponse httpservletresponse, object o, modelandview modelandview) throws exception { } @override public void aftercompletion(httpservletrequest httpservletrequest, httpservletresponse httpservletresponse, object o, exception e) throws exception { } }
4、jwtutil
public static final long expiration_time = 2592_000_000l; // 有效期30天 public static final string secret = "pl_token_secret"; public static final string header = "token"; public static final string user_id = "userid"; /** * 根据userid生成token * @param userid * @return */ public static string generatetoken(string userid) { hashmap<string, object> map = new hashmap<>(); map.put(user_id, userid); string jwt = jwts.builder() .setclaims(map) .setexpiration(new date(system.currenttimemillis() + expiration_time)) .signwith(signaturealgorithm.hs512, secret) .compact(); return jwt; } /** * 验证token * @param request * @return 验证通过返回userid */ public static integer verifytoken(httpservletrequest request) { string token = request.getheader(header); if (token != null) { try { map<string, object> body = jwts.parser() .setsigningkey(secret) .parseclaimsjws(token) .getbody(); for (map.entry entry : body.entryset()) { object key = entry.getkey(); object value = entry.getvalue(); if (key.tostring().equals(user_id)) { return integer.valueof(value.tostring());// userid } } return null; } catch (exception e) { logger.error(e.getmessage()); throw new tokenvalidationexception("unauthorized"); } } else { throw new tokenvalidationexception("missing token"); } }
结语: ok, 搞定,睡了, 好困
推荐阅读
-
Springboot项目redisTemplate实现轻量级消息队列
-
Springboot项目redisTemplate实现轻量级消息队列
-
springboot项目中使用netty+websocket 实现消息推送(带校验用户是否登陆功能)
-
springboot 整合 RabbitMQ实现消息队列
-
springboot整合rabbitmq实现生产者消息确认、死信交换器、未路由到队列的消息处理
-
springboot整合rabbitmq实现生产者消息确认、死信交换器、未路由到队列的消息处理
-
Java ActiveMQ简介以及springboot集成activeMQ实现消息队列监听以及实现MQ延迟
-
docker 构建git+maven+jdk8的centos7环境,实现轻量级的springboot项目的自动化部署
-
13、Springboot集成Kafka,最简化消息队列通信的实现
-
springboot 整合 RabbitMQ实现消息队列