欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

xxl-job如何滥用netty导致的问题及解决方案

程序员文章站 2022-06-25 20:18:21
netty作为一种高性能的网络编程框架,在很多开源项目中大放异彩,十分亮眼,但是在有些项目中却被滥用,导致使用者使用起来非常的难受。笔者使用的是2.3.0版本的xxl-job,也是当前的最新版本;下面...

netty作为一种高性能的网络编程框架,在很多开源项目中大放异彩,十分亮眼,但是在有些项目中却被滥用,导致使用者使用起来非常的难受。

笔者使用的是2.3.0版本的xxl-job,也是当前的最新版本;下面所有的代码修改全部基于2.3.0版本的xxl-job源代码

其中,xxl-job-admin对应着项目:

spring-boot项目对应着示例项目:

一、xxl-job存在的多端口问题

关于xxl-job如何使用的问题,可以参考我的另外一篇文章:

现在java开发基本上已经离不开spring boot了吧,我在spring boot中集成了xxl-job-core组件并且已经能够正常使用,但是一旦部署到测试环境就不行了,这是因为测试环境使用了docker,spring boot集成xxl-job-core组件之后会额外开启9999端口号给xxl-job-admin调用使用,如果docker不开启宿主机到docker的端口映射,xxl-job-admin自然就会调用失败。这导致了以下问题:

  • 每个spring boot程序都要开两个端口号,意味着同时运行着两个服务进行端口监听,浪费计算和内存资源
  • 如果使用docker部署,需要再额外做宿主机和容器的9999端口号的映射,否则外部的xxl-job-admin将无法访问。

那如果两个不同的服务都集成了xxl-job,但是部署在同一台机器上,又会发生什么呢?答案是如果不指定特定端口号,两个服务肯定都要使用9999端口号,势必会端口冲突,但是xxl-job已经想到了9999端口号被占用的情况,如果9999端口号被占用,则会端口号加一再重试。

xxl-job-core组件额外开启9999端口号到底合不合理?

举个例子:spring boot程序集成swagger-ui是很常见的操作吧,也没见swagger-ui再额外开启端口号啊,我认为是不合理的。但是,我认为作者这样做也有他的考虑---并非所有程序都是spring-boot的程序,也有使用其它框架的程序,使用独立的netty server作为客户端能够保证在使用java的任意xxl-job客户端都能稳定的向xxl-job-admin提供服务。然而java开发者们绝大多数情况下都是使用spirng-boot构建程序,在这种情况下,作者偷懒没有构建专门在spirng boot框架下使用的xxl-job-core,而是想了个类似万金油的蠢招解决问题,让所有在spring-boot框架下的开发者都一起难受,实在是令人费解。

二、源码追踪

一切的起点要从spring-boot程序集成xxl-job-core说起,集成方式很简单,只需要成功创建一个xxljobspringexecutor bean对象即可。

@bean
    public xxljobspringexecutor xxljobexecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        xxljobspringexecutor xxljobspringexecutor = new xxljobspringexecutor();
        xxljobspringexecutor.setadminaddresses(adminaddresses);
        xxljobspringexecutor.setappname(appname);
        xxljobspringexecutor.setaddress(address);
        xxljobspringexecutor.setip(ip);
        xxljobspringexecutor.setport(port);
        xxljobspringexecutor.setaccesstoken(accesstoken);
        xxljobspringexecutor.setlogpath(logpath);
        xxljobspringexecutor.setlogretentiondays(logretentiondays);

        return xxljobspringexecutor;
    }

xxljobspringexecutor对象创建完成之后会做一些xxl-job初始化的操作,包含连接xxl-job-admin以及启动netty server。

展开xxljobspringexecutor源码,可以看到它实现了smartinitializingsingleton接口,这就意味着bean对象创建完成之后会回调aftersingletonsinstantiated接口

// start
    @override
    public void aftersingletonsinstantiated() {

        // init jobhandler repository
        /*initjobhandlerrepository(applicationcontext);*/

        // init jobhandler repository (for method)
        initjobhandlermethodrepository(applicationcontext);

        // refresh gluefactory
        gluefactory.refreshinstance(1);

        // super start
        try {
            super.start();
        } catch (exception e) {
            throw new runtimeexception(e);
        }
    }

super.start();这行代码中,会调用父类xxljobexecutor的start方法做初始化

public void start() throws exception {

        // init logpath
        xxljobfileappender.initlogpath(logpath);

        // init invoker, admin-client
        initadminbizlist(adminaddresses, accesstoken);


        // init joblogfilecleanthread
        joblogfilecleanthread.getinstance().start(logretentiondays);

        // init triggercallbackthread
        triggercallbackthread.getinstance().start();

        // init executor-server
        initembedserver(address, ip, port, appname, accesstoken);
    }

initembedserver(address, ip, port, appname, accesstoken);这行代码做开启netty-server的操作

 private void initembedserver(string address, string ip, int port, string appname, string accesstoken) throws exception {

        // fill ip port
        port = port>0?port: netutil.findavailableport(9999);
        ip = (ip!=null&&ip.trim().length()>0)?ip: iputil.getip();

        // generate address
        if (address==null || address.trim().length()==0) {
            string ip_port_address = iputil.getipport(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is null
            address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
        }

        // accesstoken
        if (accesstoken==null || accesstoken.trim().length()==0) {
            logger.warn(">>>>>>>>>>> xxl-job accesstoken is empty. to ensure system security, please set the accesstoken.");
        }

        // start
        embedserver = new embedserver();
        embedserver.start(address, port, appname, accesstoken);
    }

可以看到这里会创建embedserver对象,并且使用start方法开启netty-server,在这里就能看到熟悉的一大坨了

xxl-job如何滥用netty导致的问题及解决方案

除了开启读写空闲检测之外,就只做了一件事:开启http服务,也就是说,xxl-job-admin是通过http请求调用客户端的接口触发客户端的任务调度的。最终处理方法在embedhttpserverhandler类中,顺着embedhttpserverhandler类的方法找,可以最终找到处理的方法com.xxl.job.core.server.embedserver.embedhttpserverhandler#process

private object process(httpmethod httpmethod, string uri, string requestdata, string accesstokenreq) {
    // valid
    if (httpmethod.post != httpmethod) {
        return new returnt<string>(returnt.fail_code, "invalid request, httpmethod not support.");
    }
    if (uri==null || uri.trim().length()==0) {
        return new returnt<string>(returnt.fail_code, "invalid request, uri-mapping empty.");
    }
    if (accesstoken!=null
        && accesstoken.trim().length()>0
        && !accesstoken.equals(accesstokenreq)) {
        return new returnt<string>(returnt.fail_code, "the access token is wrong.");
    }

    // services mapping
    try {
        if ("/beat".equals(uri)) {
            return executorbiz.beat();
        } else if ("/idlebeat".equals(uri)) {
            idlebeatparam idlebeatparam = gsontool.fromjson(requestdata, idlebeatparam.class);
            return executorbiz.idlebeat(idlebeatparam);
        } else if ("/run".equals(uri)) {
            triggerparam triggerparam = gsontool.fromjson(requestdata, triggerparam.class);
            return executorbiz.run(triggerparam);
        } else if ("/kill".equals(uri)) {
            killparam killparam = gsontool.fromjson(requestdata, killparam.class);
            return executorbiz.kill(killparam);
        } else if ("/log".equals(uri)) {
            logparam logparam = gsontool.fromjson(requestdata, logparam.class);
            return executorbiz.log(logparam);
        } else {
            return new returnt<string>(returnt.fail_code, "invalid request, uri-mapping("+ uri +") not found.");
        }
    } catch (exception e) {
        logger.error(e.getmessage(), e);
        return new returnt<string>(returnt.fail_code, "request error:" + throwableutil.tostring(e));
    }
}

从这段代码的逻辑可以看到

  • 只接受post请求
  • 如果有token,则会校验token
  • 只提供/beat、/idelbeat、/run、/kill、/log 五个接口,所有请求的处理都会委托给executorbiz处理。

最后,netty将executorbiz处理结果写回xxl-job-admin,然后请求就结束了。这里netty扮演的角色非常简单,我认为可以使用spring-mvc非常容易的替换掉它的功能。

三、使用spring-mvc替换netty的功能

1.新增spring-mvc代码

这里要修改xxl-job-core的源代码,首先,加入spring-mvc的依赖

		<!-- spring-web -->
		<dependency>
			<groupid>org.springframework</groupid>
			<artifactid>spring-web</artifactid>
			<version>${spring.version}</version>
			<scope>provided</scope>
		</dependency>

然后新增controller文件

package com.xxl.job.core.controller;

import com.xxl.job.core.biz.impl.executorbizimpl;
import com.xxl.job.core.biz.model.*;
import org.springframework.web.bind.annotation.postmapping;
import org.springframework.web.bind.annotation.requestbody;
import org.springframework.web.bind.annotation.restcontroller;

/**
 * @author kdyzm
 * @date 2021/5/7
 */
@restcontroller
public class xxljobcontroller {

    @postmapping("/beat")
    public returnt<string> beat() {
        return new executorbizimpl().beat();
    }

    @postmapping("/idlebeat")
    public returnt<string> idlebeat(@requestbody idlebeatparam param) {
        return new executorbizimpl().idlebeat(param);
    }

    @postmapping("/run")
    public returnt<string> run(@requestbody triggerparam param) {
        return new executorbizimpl().run(param);
    }

    @postmapping("/kill")
    public returnt<string> kill(@requestbody killparam param) {
        return new executorbizimpl().kill(param);
    }

    @postmapping("/log")
    public returnt<logresult> log(@requestbody logparam param) {
        return new executorbizimpl().log(param);
    }
}

2.删除老代码&移除netty依赖

之后,就要删除老的代码了,修改com.xxl.job.core.server.embedserver#start方法,清空所有代码,新增

// start registry
startregistry(appname, address);

然后删除embedserver类中的以下两个变量及相关的引用

 private executorbiz executorbiz;
    private thread thread;

之后删除netty的依赖

		<!-- ********************** embed server: netty + gson ********************** -->
		<dependency>
			<groupid>io.netty</groupid>
			<artifactid>netty-all</artifactid>
			<version>${netty-all.version}</version>
		</dependency>

将报错的代码全部删除,之后就可以编译成功了,当然这还不行。

3.修改注册到xxl-job-admin的端口号

注册的ip地址可以不用改,但是端口号要取spring-boot程序的端口号。

因为要复用springk-boot容器的端口号,所以这里注册的端口号要和它保持一致,修改com.xxl.job.core.executor.xxljobexecutor#initembedserver方法,注释掉

port = port > 0 ? port : netutil.findavailableport(9999);

然后修改spring-boot的配置文件,xxl-job的端口号配置改成server.port

server.port=8081
xxl.job.executor.port=${server.port}

在创建xxljobspringexecutor bean对象的时候将改值传递给它。

@bean
public xxljobspringexecutor xxljobexecutor() {
    logger.info(">>>>>>>>>>> xxl-job config init.");
    xxljobspringexecutor xxljobspringexecutor = new xxljobspringexecutor();
    xxljobspringexecutor.setadminaddresses(adminaddresses);
    xxljobspringexecutor.setappname(appname);
    xxljobspringexecutor.setaddress(address);
    xxljobspringexecutor.setip(ip);
    xxljobspringexecutor.setport(port);
    xxljobspringexecutor.setaccesstoken(accesstoken);
    xxljobspringexecutor.setlogpath(logpath);
    xxljobspringexecutor.setlogretentiondays(logretentiondays);
    return xxljobspringexecutor;
}

4.将xxl-job-core改造成spring-boot-starter

上面改造完了之后已经将逻辑变更为使用spring-mvc,但是spring-boot程序还没有办法扫描到xxl-job-core中的controller,可以手动扫描包,这里推荐使用spring-boot-starter,这样只需要将xxl-job-core加入classpath,就可以自动生效。

在 com.xxl.job.core.config包下新建config类

package com.xxl.job.core.config;

import org.springframework.context.annotation.componentscan;
import org.springframework.context.annotation.configuration;

/**
 * @author kdyzm
 * @date 2021/5/7
 */
@configuration
@componentscan(basepackages = {"com.xxl.job.core.controller"})
public class config {
}

src/main/resources/meta-inf文件夹下新建spring.factories文件,文件内容如下

org.springframework.boot.autoconfigure.enableautoconfiguration=\
com.xxl.job.core.config.config

5.增加特殊前缀匹配

上面修改之后将使用spring mvc接口替代原netty功能提供的http接口,但是暴露出的接口是/run、/beat、/kill这种有可能和宿主服务路径冲突的接口,为了防止出现路径冲突,做出以下修改

修改com.xxl.job.core.controller.xxljobcontroller类,添加@requestmapping("/xxl-job")

@restcontroller
@requestmapping("/xxl-job")
public class xxljobcontroller {
	...
}

修改com.xxl.job.core.biz.client.executorbizclient类,为每个请求添加/xxl-job前缀

package com.xxl.job.core.biz.client;

import com.xxl.job.core.biz.executorbiz;
import com.xxl.job.core.biz.model.*;
import com.xxl.job.core.util.xxljobremotingutil;

/**
 * admin api test
 *
 * @author xuxueli 2017-07-28 22:14:52
 */
public class executorbizclient implements executorbiz {

    public executorbizclient() {
    }
    public executorbizclient(string addressurl, string accesstoken) {
        this.addressurl = addressurl;
        this.accesstoken = accesstoken;

        // valid
        if (!this.addressurl.endswith("/")) {
            this.addressurl = this.addressurl + "/";
        }
    }

    private string addressurl ;
    private string accesstoken;
    private int timeout = 3;


    @override
    public returnt<string> beat() {
        return xxljobremotingutil.postbody(addressurl+"xxl-job/beat", accesstoken, timeout, "", string.class);
    }

    @override
    public returnt<string> idlebeat(idlebeatparam idlebeatparam){
        return xxljobremotingutil.postbody(addressurl+"xxl-job/idlebeat", accesstoken, timeout, idlebeatparam, string.class);
    }

    @override
    public returnt<string> run(triggerparam triggerparam) {
        return xxljobremotingutil.postbody(addressurl + "xxl-job/run", accesstoken, timeout, triggerparam, string.class);
    }

    @override
    public returnt<string> kill(killparam killparam) {
        return xxljobremotingutil.postbody(addressurl + "xxl-job/kill", accesstoken, timeout, killparam, string.class);
    }

    @override
    public returnt<logresult> log(logparam logparam) {
        return xxljobremotingutil.postbody(addressurl + "xxl-job/log", accesstoken, timeout, logparam, logresult.class);
    }

}

这样,就全部修改完了。

四、测试

重启xxl-job-executor-sample-springboot项目,查看注册到xxl-job-admin上的信息

xxl-job如何滥用netty导致的问题及解决方案

可以看到端口号已经不是默认的9999,而是和spring-boot程序保持一致的端口号,然后执行默认的job

xxl-job如何滥用netty导致的问题及解决方案

可以看到已经执行成功,在查看日志详情

xxl-job如何滥用netty导致的问题及解决方案

日志也一切正常,表示一切都改造成功了。

完整的代码修改:

五、实际使用

由于原作者基本上不理睬人,我克隆了项目2.3.0版本并且新增了2.4.1版本:

有需要的可以下载源代码自己打包xxl-job-core项目上传私服后就可以使用了

以上就是xxl-job如何滥用netty导致的问题及解决方案的详细内容,更多关于xxl-job滥用netty的资料请关注其它相关文章!