异步调度设计
程序员文章站
2022-05-18 12:47:39
...
平常我们做应用功能的时候,经常会碰到A、B、C等多起事件取数据,进行组装后反馈结果集。
//用户信息 userService.getProfile(userId); //用户信用信息 accountService.getAccount(userId); //SNS snsService.getSNS(userId); //send email\sms platformService.sendMessage(userId,message)
综上所述,我们能够观察得到其实上述4次事件完全可以并行处理,现有的方式我们的执行计划是采纳串行的方式
明显能够感觉得到A+B+C+D所消耗的时间才是这个method方法执行所消耗的时间,那如果采纳为并行的方式进行调用呢?
串行计划耗时:1+1+3+1=6
并行计划耗时:3
底层最为重要的一步,通过代理的方式,把类中的method都进行future调用
采用的是阿里的异步调用组件,但是代码有些老我进行了微调优,但是核心的内容主要还是proxy的方式,代码挺丑的,但是蛮有趣的方式进行异步调用。
考虑到并不是每个逻辑处理都需要异步的方式,
spring配置
<bean id="autoProxyCreator" class="spring.lifecycle.BeanNameAutoProxyCreator"> <property name="beanNames"> <list> <value>*Service</value> <value>*DAO</value> </list> </property> <property name="interceptorFilter"> <map> <entry key="*Service" value="transactionInterceptor,paramterInterceptor,monitorInterceptor"/> </map> </property> </bean>我在AbstractAutoProxyCreator中新增的一属性值
private Map<String, String> interceptorFilter = new HashMap<String, String>();然后进行拦截工作,指定不同的Service被拦截不同的拦截器
protected Advisor[] buildAdvisors(String beanName, Object[] specificInterceptors) { Iterator itor = interceptorFilter.entrySet().iterator(); boolean ismatch = false; while (itor.hasNext()) { Map.Entry<String, String> entry = (Map.Entry<String, String>) itor.next(); String key = entry.getKey(); String values = entry.getValue(); if (key.contains("*")) { key = key.replace("*", ""); Pattern pattern = Pattern.compile(".*(" + key + "$)"); Matcher matcher = pattern.matcher(beanName); ismatch = matcher.matches(); } else { ismatch = key.matches(beanName); } if (ismatch) { this.interceptorNames = values.split(","); } } ...
前几天阿里的一位问起如何有没有考虑异步校验工作,一开始时候没想到,但是回过头想了下异步校验可能有其特殊的场合需要,正常情况下应该只需要顺序校验field。所以依托于其组件上层,写了一个异步注解的校验方式
核心代码如下
public void validateFieldMethod(final Object o) throws Exception { Field[] fields = o.getClass().getDeclaredFields(); for (final Field field : fields) { if (field.getAnnotation(ValidationDriver.class) != null) { field.setAccessible(true); Future future = executor.submit(new AsyncLoadCallable() { public Object call() throws Exception { try { return AnnotationValidator.validate(field.get(o)); } catch (Throwable e) { throw new AsyncLoadException("future invoke error!", e); } } public AsyncLoadConfig getConfig() { return config; } }); } } }
public class AsyncLoadEnhanceProxy<T> implements AsyncLoadProxy<T> { private T service; private AsyncLoadConfig config; private AsyncLoadExecutor executor; private Class<T> targetClass; public AsyncLoadEnhanceProxy(){ } public AsyncLoadEnhanceProxy(T service, AsyncLoadExecutor executor){ this(service, new AsyncLoadConfig(), executor); } public AsyncLoadEnhanceProxy(T service, AsyncLoadConfig config, AsyncLoadExecutor executor){ this.service = service; this.config = config; this.executor = executor; this.targetClass = (Class<T>) service.getClass();// 默认的代理class对象即为service } public T getProxy() { validate(); return getProxyInternal(); } /** * 相应的检查方法 */ private void validate() { AsyncLoadUtils.notNull(service, "service should not be null"); AsyncLoadUtils.notNull(config, "config should not be null"); AsyncLoadUtils.notNull(executor, "executor should not be null"); if (Modifier.isFinal(targetClass.getModifiers())) { // 目前暂不支持final类型的处理,以后可以考虑使用jdk // proxy throw new AsyncLoadException("Enhance proxy not support final class :" + targetClass.getName()); } if (!Modifier.isPublic(targetClass.getModifiers())) { // 处理如果是非public属性,则不进行代理,强制访问会出现IllegalAccessException,比如一些内部类或者匿名类不允许直接访问 throw new AsyncLoadException("Enhance proxy not support private/protected class :" + targetClass.getName()); } } /** * 异步校验 * @param o * @throws Exception */ public void validateFieldMethod(final Object o) throws Exception { Field[] fields = o.getClass().getDeclaredFields(); for (final Field field : fields) { if (field.getAnnotation(ValidationDriver.class) != null) { field.setAccessible(true); Future future = executor.submit(new AsyncLoadCallable() { public Object call() throws Exception { try { return AnnotationValidator.validate(field.get(o)); } catch (Throwable e) { throw new AsyncLoadException("future invoke error!", e); } } public AsyncLoadConfig getConfig() { return config; } }); } } } class AsyncLoadCallbackFilter implements CallbackFilter { public int accept(Method method) { // 预先进行匹配,直接计算好需要处理的method,避免动态匹配浪费性能 if (AsyncLoadObject.class.isAssignableFrom(method.getDeclaringClass())) {// 判断对应的方法是否属于AsyncLoadObject return 0; // for AsyncLoadServiceInterceptor } else { Map<AsyncLoadMethodMatch, Long> matches = config.getMatches(); Set<AsyncLoadMethodMatch> methodMatchs = matches.keySet(); if (methodMatchs != null && !methodMatchs.isEmpty()) { for (Iterator<AsyncLoadMethodMatch> methodMatch = methodMatchs.iterator(); methodMatch.hasNext();) { if (methodMatch.next().matches(method)) { return 2; // for AsyncLoadInterceptor } } } return 1; // for AsyncLoadDirect } } } class AsyncLoadServiceInterceptor implements MethodInterceptor { public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable { if ("_getOriginalClass".equals(method.getName())) { return getOriginalClass(); } throw new AsyncLoadException("method[" + method.getName() + "] is not support!"); } private Object getOriginalClass() { return targetClass; } } class AsyncLoadDirect implements Dispatcher { public Object loadObject() throws Exception { return service; } } class AsyncLoadInterceptor implements MethodInterceptor { public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable { Long timeout = getMatchTimeout(method); final Object finObj = service; final Object[] finArgs = args; final Method finMethod = method; Class returnClass = method.getReturnType(); if (Void.TYPE.isAssignableFrom(returnClass)) {// 判断返回值是否为void // 不处理void的函数调用 return finMethod.invoke(finObj, finArgs); } else if (!Modifier.isPublic(returnClass.getModifiers())) { // 处理如果是非public属性,则不进行代理,强制访问会出现IllegalAccessException,比如一些内部类或者匿名类不允许直接访问 return finMethod.invoke(finObj, finArgs); } else if (Modifier.isFinal(returnClass.getModifiers())) { // 处理特殊的final类型,目前暂不支持,后续可采用jdk proxy return finMethod.invoke(finObj, finArgs); } else if (returnClass.isPrimitive() || returnClass.isArray()) { // 不处理特殊类型,因为无法使用cglib代理 return finMethod.invoke(finObj, finArgs); } else if (returnClass == Object.class) { // 针对返回对象是Object类型,不做代理。没有具体的method,代理没任何意义 return finMethod.invoke(finObj, finArgs); } else { Future future = executor.submit(new AsyncLoadCallable() { public Object call() throws Exception { try { return finMethod.invoke(finObj, finArgs);// 需要直接委托对应的finObj(service)进行处理 } catch (Throwable e) { throw new AsyncLoadException("future invoke error!", e); } } public AsyncLoadConfig getConfig() { return config; } }); // 够造一个返回的AsyncLoadResult AsyncLoadResult result = new AsyncLoadResult(returnClass, future, timeout); // 继续返回一个代理对象 AsyncLoadObject asyncProxy = (AsyncLoadObject) result.getProxy(); // 添加到barrier中 if (config.getNeedBarrierSupport()) { AsyncLoadBarrier.addTask((AsyncLoadObject) asyncProxy); } // 返回对象 return asyncProxy; } } /** * 返回对应的匹配的timeout时间,一定能找到对应的匹配点 * * @param method * @return */ private Long getMatchTimeout(Method method) { Map<AsyncLoadMethodMatch, Long> matches = config.getMatches(); Set<Map.Entry<AsyncLoadMethodMatch, Long>> entrys = matches.entrySet(); if (entrys != null && !entrys.isEmpty()) { for (Iterator<Map.Entry<AsyncLoadMethodMatch, Long>> iter = entrys.iterator(); iter.hasNext();) { Map.Entry<AsyncLoadMethodMatch, Long> entry = iter.next(); if (entry.getKey().matches(method)) { return entry.getValue(); } } } return config.getDefaultTimeout(); } } // =========================== help mehotd ================================= /** * 优先从Repository进行获取ProxyClass,创建对应的object * * @return */ private T getProxyInternal() { Class proxyClass = AsyncLoadProxyRepository.getProxy(targetClass.getName()); if (proxyClass == null) { Enhancer enhancer = new Enhancer(); if (targetClass.isInterface()) { // 判断是否为接口,优先进行接口代理可以解决service为final enhancer.setInterfaces(new Class[] { targetClass }); } else { enhancer.setSuperclass(targetClass); } enhancer.setCallbackTypes(new Class[] { AsyncLoadServiceInterceptor.class, AsyncLoadDirect.class, AsyncLoadInterceptor.class }); enhancer.setCallbackFilter(new AsyncLoadCallbackFilter()); proxyClass = enhancer.createClass(); // 注册proxyClass AsyncLoadProxyRepository.registerProxy(targetClass.getName(), proxyClass); } Enhancer.registerCallbacks(proxyClass, new Callback[] { new AsyncLoadServiceInterceptor(), new AsyncLoadDirect(), new AsyncLoadInterceptor() }); try { return (T) AsyncLoadReflectionHelper.newInstance(proxyClass); } finally { // clear thread callbacks to allow them to be gc'd Enhancer.registerStaticCallbacks(proxyClass, null); } } // ====================== setter / getter =========================== public void setService(T service) { this.service = service; if (targetClass == null) { this.targetClass = (Class<T>) service.getClass(); } } public void setConfig(AsyncLoadConfig config) { this.config = config; } public void setExecutor(AsyncLoadExecutor executor) { this.executor = executor; } public void setTargetClass(Class targetClass) { this.targetClass = targetClass; } }
上一篇: 网页使用Google Font API(字体)的方法
下一篇: 苹果和梨可以一起煮吗,有哪些煮法?