欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

spring异步service中处理线程数限制详解

程序员文章站 2022-03-27 17:16:32
情况简介 spring项目,controller异步调用service的方法,产生大量并发。 具体业务: 前台同时传入大量待翻译的单词,后台业务接收单词,并调...

情况简介

spring项目,controller异步调用service的方法,产生大量并发。

具体业务:

前台同时传入大量待翻译的单词,后台业务接收单词,并调用百度翻译接口翻译接收单词并将翻译结果保存到数据库,前台不需要实时返回翻译结果。

处理方式:

controller接收文本调用service中的异步方法,将单词先保存到队列中,再启动2个新线程,从缓存队列中取单词,并调用百度翻译接口获取翻译结果并将翻译结果保存到数据库。

本文主要知识点:

多线程同时(异步)调用方法后,开启新线程,并限制线程数量。

代码如下:

@service
public class lgtsasyncserviceimpl {
 /** logger日志. */
 public static final logger logger = logger.getlogger(lgtsasyncserviceimpl2.class);

 private final blockingqueue<lgts> que = new linkedblockingqueue<>();// 待翻译的队列
 private final atomicinteger threadcnt = new atomicinteger(0);// 当前翻译中的线程数
 private final vector<string> existskey = new vector<>();// 保存已入队列的数据
 private final int maxthreadcnt = 2;// 允许同时执行的翻译线程数
 private static final int num_of_every_time = 50;// 每次提交的翻译条数
 private static final string translationfrom = "zh";

 @async
 public void saveasync(lgts t) {
  if (objects.isnull(t) || stringutils.isanyblank(t.getgco(), t.getcode())) {
   return;
  }
  offer(t);
  save();
  return;
 }

 private boolean offer(lgts t) {
  string key = t.getgco() + "-" + t.getcode();
  if (!existskey.contains(key)) {
   existskey.add(key);
   boolean result = que.offer(t);
   // logger.trace("待翻译文字[" + t.getgco() + ":" + t.getcode() + "]加入队列结果[" + result
   // + "],队列中数据总个数:" + que.size());
   return result;
  }
  return false;
 }

 @autowired
 private lgtsservice lgtsservice;

 private void save() {
  int cnt = threadcnt.incrementandget();// 当前线程数+1
  if (cnt > maxthreadcnt) {
   // 已启动的线程大于设置的最大线程数直接丢弃
   threadcnt.decrementandget();// +1的线程数再-回去
   return;
  }
  gwalluser user = userutils.getuser();
  thread thr = new thread() {
   public void run() {
    long sleeptime = 30000l;
    userutils.setuser(user);
    boolean continueflag = true;
    int maxcontinuecnt = 5;// 最大连续休眠次数,连续休眠次数超过最大休眠次数后,while循环退出,当前线程销毁
    int continuecnt = 0;// 连续休眠次数

    while (continueflag) {// 队列不为空时执行
     if (objects.isnull(que.peek())) {
      try {
       if (continuecnt > maxcontinuecnt) {
        // 连续休眠次数达到最大连续休眠次数,当前线程将销毁。
        continueflag = false;
        continue;
       }
       // 队列为空,准备休眠
       thread.sleep(sleeptime);
       continuecnt++;
       continue;
      } catch (interruptedexception e) {
       // 休眠失败,无需处理
       e.printstacktrace();
      }
     }
     continuecnt = 0;// 重置连续休眠次数为0

     list<lgts> params = new arraylist<>();
     int totalcnt = que.size();
     que.drainto(params, num_of_every_time);
     stringbuilder utf8q = new stringbuilder();
     string code = "";
     list<lgts> needremove = new arraylist<>();
     for (lgts lgts : params) {
      if (stringutils.isanyblank(code)) {
       code = lgts.getcode();
      }
      // 移除existskey中保存的key,以免下面翻译失败时再次加入队列时,加入不进去
      string key = lgts.getgco() + "-" + lgts.getcode();
      existskey.remove(key);

      if (!code.equalsignorecase(lgts.getcode())) {// 要翻译的目标语言与当前列表中的第一个不一致
       offer(lgts);// 重新将待翻译的语言放回队列
       needremove.add(lgts);
       continue;
      }
      utf8q.append(lgts.getgco()).append("\n");
     }
     params.removeall(needremove);
     logger.debug("队列*" + totalcnt + " 个,获取" + params.size() + " 个符合条件的待翻译内容,编码:" + code);
     string to = "en";
     if (stringutils.isanyblank(utf8q, to)) {
      logger.warn("调用翻译出错,未找到[" + code + "]对应的百度编码。");
      continue;
     }
     map<string, string> result = getbaidutranslation(utf8q.tostring(), translationfrom, to);
     if (objects.isnull(result) || result.isempty()) {// 把没有获取到翻译结果的重新放回队列
      for (lgts lgts : params) {
       offer(lgts);
      }
      logger.debug("本次翻译结果为空。");
      continue;
     }
     int sucesscnt = 0, ignorecnt = 0;
     for (lgts lgts : params) {
      lgts.setbdcode(to);
      string gna = result.get(lgts.getgco());
      if (stringutils.isanyblank(gna)) {
       offer(lgts);// 重新将待翻译的语言放回队列
       continue;
      }
      lgts.setstat(1);
      lgts.setgna(gna);
      int saveresult = lgtsservice.saveignore(lgts);
      if (0 == saveresult) {
       ignorecnt++;
      } else {
       sucesscnt++;
      }
     }
     logger.debug("待翻译个数:" + params.size() + ",翻译成功个数:" + sucesscnt + ",已存在并忽略个数:" + ignorecnt);
    }
    threadcnt.decrementandget();// 运行中的线程数-1
    distory();// 清理数据,必须放在方法最后,否则distory中的判断需要修改
   }

   /**
    * 如果是最后一个线程,清空队列和existskey中的数据
    */
   private void distory() {
    if (0 == threadcnt.get()) {
     // 最后一个线程退出时,执行清理操作
     existskey.clear();
     que.clear();
    }
   }
  };
  thr.setdaemon(true);// 守护线程,如果主线程执行完毕,则此线程会自动销毁
  thr.setname("baidufanyi-" + randomutils.nextint(1000, 9999));
  thr.start();// 启动插入线程
 }

 /**
  * 百度翻译
  * 
  * @param utf8q
  *   待翻译的字符串,需要utf8格式的
  * @param from
  *   百度翻译语言列表中的代码
  *   参见:http://api.fanyi.baidu.com/api/trans/product/apidoc#languagelist
  * @param to
  *   百度翻译语言列表中的代码
  *   参见:http://api.fanyi.baidu.com/api/trans/product/apidoc#languagelist
  * @return 翻译结果
  */
 private map<string, string> getbaidutranslation(string utf8q, string from, string to) {
  map<string, string> result = new hashmap<>();
  string baiduurlstr = "http://api.fanyi.baidu.com/api/trans/vip/translate";
  if (stringutils.isanyblank(baiduurlstr)) {
   logger.warn("百度翻译api接口url相关参数为空!");
   return result;
  }
  map<string, string> params = buildparams(utf8q, from, to);
  if (params.isempty()) {
   return result;
  }

  string sendurl = geturlwithquerystring(baiduurlstr, params);
  try {
   httpclient httpclient = new httpclient();
   httpclient.setmethod("get");
   string remoteresult = httpclient.pub(sendurl, "");
   result = convertremote(remoteresult);
  } catch (exception e) {
   logger.info("百度翻译api返回结果异常!", e);
  }
  return result;
 }

 private map<string, string> convertremote(string remoteresult) {
  map<string, string> result = new hashmap<>();
  if (stringutils.isblank(remoteresult)) {
   return result;
  }
  jsonobject jsonobject = jsonobject.parseobject(remoteresult);
  jsonarray trans_result = jsonobject.getjsonarray("trans_result");
  if (objects.isnull(trans_result) || trans_result.isempty()) {
   return result;
  }
  for (object object : trans_result) {
   jsonobject trans = (jsonobject) object;
   result.put(trans.getstring("src"), trans.getstring("dst"));
  }
  return result;
 }

 private map<string, string> buildparams(string utf8q, string from, string to) {
  if (stringutils.isblank(from)) {
   from = "auto";
  }
  map<string, string> params = new hashmap<string, string>();
  string skstr = "sk";
  string appidstr = "appid";
  if (stringutils.isanyblank(skstr, appidstr)) {
   logger.warn("百度翻译api接口相关参数为空!");
   return params;
  }

  params.put("q", utf8q);
  params.put("from", from);
  params.put("to", to);

  params.put("appid", appidstr);

  // 随机数
  string salt = string.valueof(system.currenttimemillis());
  params.put("salt", salt);

  // 签名
  string src = appidstr + utf8q + salt + skstr; // 加密前的原文
  params.put("sign", md5util.md5encrypt(src).tolowercase());
  return params;
 }

 public static string geturlwithquerystring(string url, map<string, string> params) {
  if (params == null) {
   return url;
  }

  stringbuilder builder = new stringbuilder(url);
  if (url.contains("?")) {
   builder.append("&");
  } else {
   builder.append("?");
  }

  int i = 0;
  for (string key : params.keyset()) {
   string value = params.get(key);
   if (value == null) { // 过滤空的key
    continue;
   }

   if (i != 0) {
    builder.append('&');
   }

   builder.append(key);
   builder.append('=');
   builder.append(encode(value));

   i++;
  }

  return builder.tostring();
 }

 /**
  * 对输入的字符串进行url编码, 即转换为%20这种形式
  * 
  * @param input
  *   原文
  * @return url编码. 如果编码失败, 则返回原文
  */
 public static string encode(string input) {
  if (input == null) {
   return "";
  }

  try {
   return urlencoder.encode(input, "utf-8");
  } catch (unsupportedencodingexception e) {
   e.printstacktrace();
  }

  return input;
 }
}

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对的支持。