java 注解实现一个可配置线程池的方法示例
程序员文章站
2024-02-21 23:59:16
前言
项目需要多线程执行一些task,为了方便各个服务的使用。特意封装了一个公共工具类,下面直接撸代码:
poolconfig(线程池核心配置参数):
/**...
前言
项目需要多线程执行一些task,为了方便各个服务的使用。特意封装了一个公共工具类,下面直接撸代码:
poolconfig(线程池核心配置参数):
/** * <h1>线程池核心配置(<b style="color:#cd0000">基本线程池数量、最大线程池数量、队列初始容量、线程连接保持活动秒数(默认60s)</b>)</h1> * * <blockquote><code> * <table border="1px" style="border-color:gray;" width="100%"><tbody> * <tr><th style="color:green;text-align:left;"> * 属性名称 * </th><th style="color:green;text-align:left;"> * 属性含义 * </th></tr> * <tr><td> * queuecapacity * </td><td> * 基本线程池数量 * </td></tr> * <tr><td> * count * </td><td> * 最大线程池数量 * </td></tr> * <tr><td> * maxcount * </td><td> * 队列初始容量 * </td></tr> * <tr><td> * alivesec * </td><td> * 线程连接保持活动秒数(默认60s) * </td></tr> * </tbody></table> * </code></blockquote> */ public class poolconfig { private int queuecapacity = 200; private int count = 0; private int maxcount = 0; private int alivesec; public int getqueuecapacity() { return queuecapacity; } public void setqueuecapacity(int queuecapacity) { this.queuecapacity = queuecapacity; } public void setcount(int count) { this.count = count; } public void setmaxcount(int maxcount) { this.maxcount = maxcount; } public void setalivesec(int alivesec) { this.alivesec = alivesec; } public int getcount() { return count; } public int getmaxcount() { return maxcount; } public int getalivesec() { return alivesec; } }
threadpoolconfig(线程池配置 yml配置项以thread开头):
import java.util.arraylist; import java.util.hashmap; import java.util.list; import java.util.map; import org.springframework.boot.context.properties.configurationproperties; import org.springframework.stereotype.component; /** * <h1>线程池配置(<b style="color:#cd0000">线程池核心配置、各个业务处理的任务数量</b>)</h1> * * <blockquote><code> * <table border="1px" style="border-color:gray;" width="100%"><tbody> * <tr><th style="color:green;text-align:left;"> * 属性名称 * </th><th style="color:green;text-align:left;"> * 属性含义 * </th></tr> * <tr><td> * pool * </td><td> * 线程池核心配置 * 【{@link poolconfig}】 * </td></tr> * <tr><td> * count * </td><td> * 线程池各个业务任务初始的任务数 * </td></tr> * </tbody></table> * </code></blockquote> */ @component @configurationproperties(prefix="thread") public class threadpoolconfig { private poolconfig pool = new poolconfig(); map<string, integer> count = new hashmap<>(); public poolconfig getpool() { return pool; } public void setpool(poolconfig pool) { this.pool = pool; } public map<string, integer> getcount() { return count; } }
定义task注解,方便使用:
@target(elementtype.type) @retention(retentionpolicy.runtime) @documented @component public @interface excutortask { /** * the value may indicate a suggestion for a logical excutortask name, * to be turned into a spring bean in case of an autodetected excutortask . * @return the suggested excutortask name, if any */ string value() default ""; }
通过反射获取使用task注解的任务集合:
public class beans { private static final char prefix = '.'; public static concurrentmap<string, string> scanbeanclassnames(){ concurrentmap<string, string> beanclassnames = new concurrenthashmap<>(); classpathscanningcandidatecomponentprovider provider = new classpathscanningcandidatecomponentprovider(false); provider.addincludefilter(new annotationtypefilter(excutortask.class)); for(package pkg : package.getpackages()){ string basepackage = pkg.getname(); set<beandefinition> components = provider.findcandidatecomponents(basepackage); for (beandefinition component : components) { string beanclassname = component.getbeanclassname(); try { class<?> clazz = class.forname(component.getbeanclassname()); boolean isannotationpresent = clazz.isannotationpresent(zimatask.class); if(isannotationpresent){ zimatask task = clazz.getannotation(excutortask.class); string aliasname = task.value(); if(aliasname != null && !"".equals(aliasname)){ beanclassnames.put(aliasname, component.getbeanclassname()); } } } catch (classnotfoundexception e) { e.printstacktrace(); } beanclassnames.put(beanclassname.substring(beanclassname.lastindexof(prefix) + 1), component.getbeanclassname()); } } return beanclassnames; } }
线程执行类taskpool:
@component public class taskpool { public threadpooltaskexecutor pooltaskexecutor; @autowired private threadpoolconfig threadpoolconfig; @autowired private applicationcontext context; private final integer max_pool_size = 2000; private poolconfig poolcfg; private map<string, integer> taskscount; private concurrentmap<string, string> beanclassnames; @postconstruct public void init() { beanclassnames = beans.scanbeanclassnames(); pooltaskexecutor = new threadpooltaskexecutor(); poolcfg = threadpoolconfig.getpool(); taskscount = threadpoolconfig.getcount(); int corepoolsize = poolcfg.getcount(), maxpoolsize = poolcfg.getmaxcount(), queuecapacity = poolcfg.getqueuecapacity(), minpoolsize = 0, maxcount = (corepoolsize << 1); for(string taskname : taskscount.keyset()){ minpoolsize += taskscount.get(taskname); } if(corepoolsize > 0){ if(corepoolsize <= minpoolsize){ corepoolsize = minpoolsize; } }else{ corepoolsize = minpoolsize; } if(queuecapacity > 0){ pooltaskexecutor.setqueuecapacity(queuecapacity); } if(corepoolsize > 0){ if(max_pool_size < corepoolsize){ corepoolsize = max_pool_size; } pooltaskexecutor.setcorepoolsize(corepoolsize); } if(maxpoolsize > 0){ if(maxpoolsize <= maxcount){ maxpoolsize = maxcount; } if(max_pool_size < maxpoolsize){ maxpoolsize = max_pool_size; } pooltaskexecutor.setmaxpoolsize(maxpoolsize); } if(poolcfg.getalivesec() > 0){ pooltaskexecutor.setkeepaliveseconds(poolcfg.getalivesec()); } pooltaskexecutor.initialize(); } public void execute(class<?>... clazz){ int i = 0, len = taskscount.size(); for(; i < len; i++){ integer taskcount = taskscount.get(i); for(int t = 0; t < taskcount; t++){ try{ object taskobj = context.getbean(clazz[i]); if(taskobj != null){ pooltaskexecutor.execute((runnable) taskobj); } }catch(exception ex){ ex.printstacktrace(); } } } } public void execute(string... args){ int i = 0, len = taskscount.size(); for(; i < len; i++){ integer taskcount = taskscount.get(i); for(int t = 0; t < taskcount; t++){ try{ object taskobj = null; if(context.containsbean(args[i])){ taskobj = context.getbean(args[i]); }else{ if(beanclassnames.containskey(args[i].tolowercase())){ class<?> clazz = class.forname(beanclassnames.get(args[i].tolowercase())); taskobj = context.getbean(clazz); } } if(taskobj != null){ pooltaskexecutor.execute((runnable) taskobj); } }catch(exception ex){ ex.printstacktrace(); } } } } public void execute(){ for(string taskname : taskscount.keyset()){ integer taskcount = taskscount.get(taskname); for(int t = 0; t < taskcount; t++){ try{ object taskobj = null; if(context.containsbean(taskname)){ taskobj = context.getbean(taskname); }else{ if(beanclassnames.containskey(taskname)){ class<?> clazz = class.forname(beanclassnames.get(taskname)); taskobj = context.getbean(clazz); } } if(taskobj != null){ pooltaskexecutor.execute((runnable) taskobj); } }catch(exception ex){ ex.printstacktrace(); } } } } }
如何使用?(做事就要做全套 ^_^)
1.因为使用的springboot项目,需要在application.properties 或者 application.yml 添加
#配置执行的task线程数 thread.count.needexcutortask=4 #最大存活时间 thread.pool.alivesec=300000 #其他配置同理
2.将我们写的线程配置进行装载到我们的项目中
@configuration public class taskmanager { @resource private taskpool taskpool; @postconstruct public void executor(){ taskpool.execute(); } }
3.具体使用
@excutortask public class needexcutortask implements runnable{ @override public void run() { thread.sleep(1000l); log.info("====== 任务执行 =====") } }
以上就是创建一个可扩展的线程池相关的配置(望指教~~~)。希望对大家的学习有所帮助,也希望大家多多支持。