spring异步service中处理线程数限制详解
程序员文章站
2022-03-27 17:16:32
情况简介
spring项目,controller异步调用service的方法,产生大量并发。
具体业务:
前台同时传入大量待翻译的单词,后台业务接收单词,并调...
情况简介
spring项目,controller异步调用service的方法,产生大量并发。
具体业务:
前台同时传入大量待翻译的单词,后台业务接收单词,并调用百度翻译接口翻译接收单词并将翻译结果保存到数据库,前台不需要实时返回翻译结果。
处理方式:
controller接收文本调用service中的异步方法,将单词先保存到队列中,再启动2个新线程,从缓存队列中取单词,并调用百度翻译接口获取翻译结果并将翻译结果保存到数据库。
本文主要知识点:
多线程同时(异步)调用方法后,开启新线程,并限制线程数量。
代码如下:
@service public class lgtsasyncserviceimpl { /** logger日志. */ public static final logger logger = logger.getlogger(lgtsasyncserviceimpl2.class); private final blockingqueue<lgts> que = new linkedblockingqueue<>();// 待翻译的队列 private final atomicinteger threadcnt = new atomicinteger(0);// 当前翻译中的线程数 private final vector<string> existskey = new vector<>();// 保存已入队列的数据 private final int maxthreadcnt = 2;// 允许同时执行的翻译线程数 private static final int num_of_every_time = 50;// 每次提交的翻译条数 private static final string translationfrom = "zh"; @async public void saveasync(lgts t) { if (objects.isnull(t) || stringutils.isanyblank(t.getgco(), t.getcode())) { return; } offer(t); save(); return; } private boolean offer(lgts t) { string key = t.getgco() + "-" + t.getcode(); if (!existskey.contains(key)) { existskey.add(key); boolean result = que.offer(t); // logger.trace("待翻译文字[" + t.getgco() + ":" + t.getcode() + "]加入队列结果[" + result // + "],队列中数据总个数:" + que.size()); return result; } return false; } @autowired private lgtsservice lgtsservice; private void save() { int cnt = threadcnt.incrementandget();// 当前线程数+1 if (cnt > maxthreadcnt) { // 已启动的线程大于设置的最大线程数直接丢弃 threadcnt.decrementandget();// +1的线程数再-回去 return; } gwalluser user = userutils.getuser(); thread thr = new thread() { public void run() { long sleeptime = 30000l; userutils.setuser(user); boolean continueflag = true; int maxcontinuecnt = 5;// 最大连续休眠次数,连续休眠次数超过最大休眠次数后,while循环退出,当前线程销毁 int continuecnt = 0;// 连续休眠次数 while (continueflag) {// 队列不为空时执行 if (objects.isnull(que.peek())) { try { if (continuecnt > maxcontinuecnt) { // 连续休眠次数达到最大连续休眠次数,当前线程将销毁。 continueflag = false; continue; } // 队列为空,准备休眠 thread.sleep(sleeptime); continuecnt++; continue; } catch (interruptedexception e) { // 休眠失败,无需处理 e.printstacktrace(); } } continuecnt = 0;// 重置连续休眠次数为0 list<lgts> params = new arraylist<>(); int totalcnt = que.size(); que.drainto(params, num_of_every_time); stringbuilder utf8q = new stringbuilder(); string code = ""; list<lgts> needremove = new arraylist<>(); for (lgts lgts : params) { if (stringutils.isanyblank(code)) { code = lgts.getcode(); } // 移除existskey中保存的key,以免下面翻译失败时再次加入队列时,加入不进去 string key = lgts.getgco() + "-" + lgts.getcode(); existskey.remove(key); if (!code.equalsignorecase(lgts.getcode())) {// 要翻译的目标语言与当前列表中的第一个不一致 offer(lgts);// 重新将待翻译的语言放回队列 needremove.add(lgts); continue; } utf8q.append(lgts.getgco()).append("\n"); } params.removeall(needremove); logger.debug("队列*" + totalcnt + " 个,获取" + params.size() + " 个符合条件的待翻译内容,编码:" + code); string to = "en"; if (stringutils.isanyblank(utf8q, to)) { logger.warn("调用翻译出错,未找到[" + code + "]对应的百度编码。"); continue; } map<string, string> result = getbaidutranslation(utf8q.tostring(), translationfrom, to); if (objects.isnull(result) || result.isempty()) {// 把没有获取到翻译结果的重新放回队列 for (lgts lgts : params) { offer(lgts); } logger.debug("本次翻译结果为空。"); continue; } int sucesscnt = 0, ignorecnt = 0; for (lgts lgts : params) { lgts.setbdcode(to); string gna = result.get(lgts.getgco()); if (stringutils.isanyblank(gna)) { offer(lgts);// 重新将待翻译的语言放回队列 continue; } lgts.setstat(1); lgts.setgna(gna); int saveresult = lgtsservice.saveignore(lgts); if (0 == saveresult) { ignorecnt++; } else { sucesscnt++; } } logger.debug("待翻译个数:" + params.size() + ",翻译成功个数:" + sucesscnt + ",已存在并忽略个数:" + ignorecnt); } threadcnt.decrementandget();// 运行中的线程数-1 distory();// 清理数据,必须放在方法最后,否则distory中的判断需要修改 } /** * 如果是最后一个线程,清空队列和existskey中的数据 */ private void distory() { if (0 == threadcnt.get()) { // 最后一个线程退出时,执行清理操作 existskey.clear(); que.clear(); } } }; thr.setdaemon(true);// 守护线程,如果主线程执行完毕,则此线程会自动销毁 thr.setname("baidufanyi-" + randomutils.nextint(1000, 9999)); thr.start();// 启动插入线程 } /** * 百度翻译 * * @param utf8q * 待翻译的字符串,需要utf8格式的 * @param from * 百度翻译语言列表中的代码 * 参见:http://api.fanyi.baidu.com/api/trans/product/apidoc#languagelist * @param to * 百度翻译语言列表中的代码 * 参见:http://api.fanyi.baidu.com/api/trans/product/apidoc#languagelist * @return 翻译结果 */ private map<string, string> getbaidutranslation(string utf8q, string from, string to) { map<string, string> result = new hashmap<>(); string baiduurlstr = "http://api.fanyi.baidu.com/api/trans/vip/translate"; if (stringutils.isanyblank(baiduurlstr)) { logger.warn("百度翻译api接口url相关参数为空!"); return result; } map<string, string> params = buildparams(utf8q, from, to); if (params.isempty()) { return result; } string sendurl = geturlwithquerystring(baiduurlstr, params); try { httpclient httpclient = new httpclient(); httpclient.setmethod("get"); string remoteresult = httpclient.pub(sendurl, ""); result = convertremote(remoteresult); } catch (exception e) { logger.info("百度翻译api返回结果异常!", e); } return result; } private map<string, string> convertremote(string remoteresult) { map<string, string> result = new hashmap<>(); if (stringutils.isblank(remoteresult)) { return result; } jsonobject jsonobject = jsonobject.parseobject(remoteresult); jsonarray trans_result = jsonobject.getjsonarray("trans_result"); if (objects.isnull(trans_result) || trans_result.isempty()) { return result; } for (object object : trans_result) { jsonobject trans = (jsonobject) object; result.put(trans.getstring("src"), trans.getstring("dst")); } return result; } private map<string, string> buildparams(string utf8q, string from, string to) { if (stringutils.isblank(from)) { from = "auto"; } map<string, string> params = new hashmap<string, string>(); string skstr = "sk"; string appidstr = "appid"; if (stringutils.isanyblank(skstr, appidstr)) { logger.warn("百度翻译api接口相关参数为空!"); return params; } params.put("q", utf8q); params.put("from", from); params.put("to", to); params.put("appid", appidstr); // 随机数 string salt = string.valueof(system.currenttimemillis()); params.put("salt", salt); // 签名 string src = appidstr + utf8q + salt + skstr; // 加密前的原文 params.put("sign", md5util.md5encrypt(src).tolowercase()); return params; } public static string geturlwithquerystring(string url, map<string, string> params) { if (params == null) { return url; } stringbuilder builder = new stringbuilder(url); if (url.contains("?")) { builder.append("&"); } else { builder.append("?"); } int i = 0; for (string key : params.keyset()) { string value = params.get(key); if (value == null) { // 过滤空的key continue; } if (i != 0) { builder.append('&'); } builder.append(key); builder.append('='); builder.append(encode(value)); i++; } return builder.tostring(); } /** * 对输入的字符串进行url编码, 即转换为%20这种形式 * * @param input * 原文 * @return url编码. 如果编码失败, 则返回原文 */ public static string encode(string input) { if (input == null) { return ""; } try { return urlencoder.encode(input, "utf-8"); } catch (unsupportedencodingexception e) { e.printstacktrace(); } return input; } }
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对的支持。