欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

微服务化之----熔断和隔离

程序员文章站 2022-07-13 16:47:12
...

背景:

 

最近在负责一个数据中心的搭建工作,业务场景较多,比如:历史、实时数据计算和查询,汇总分析数据,以及基于数据与业务相结合提供智能推荐服务。对于每个不同的业务场景,所需要存储介质以及容量都不尽相同。比如:实时数据依赖redis,历史数据依赖hbase、mysql、mongdo,用户画像数据依赖es,redis等等。如果不进行微服务化拆分,其中任何一个组件出现问题,整个数据服务工程将都会有瘫痪的风险。

 

面临的问题:所有功能都被柔和在一个工程里,牵一发动全身,业务扩展难度大;相互干扰,每次新业务上线不仅需要验证新功能,还需要验证以前的功能是否受影响;另外 在大促期间也不易于做灾备、熔断和隔离。

 

由于以上原因,最近对整个系统架构做了微服务化拆分。首先看下老系统架构。

 

老系统架构:

 
微服务化之----熔断和隔离
            
    
    博客分类: 架构 微服务隔离熔断限流java责任链模式 
 

 

 

可以看到这里典型的MVC架构(springmvc),业务代码彼此交织在一起,不便于扩展;各种服务相互依赖,假如hbass或redis整个数据中心将无法正常工作。

 

微服务化架构:

 

再看下微服务拆分后的系统架构
微服务化之----熔断和隔离
            
    
    博客分类: 架构 微服务隔离熔断限流java责任链模式 
 

 

名称解释jsf:京东自己开发的RPC框架,类似于淘宝的duboo(图中虚线箭头表示jsf接口调用)

 

可以看到这里已经将原来的一个大工程,拆成8个独立的子工程:5个jsf工程 + 3个web服务工程。每个子工程只负责自己独立的业务逻辑,有自己独立的redis集群,数据彼此隔离。

 

Jsf服务子工程集(5个):目前第一期是根据存储介质的不同进行拆分。后续还可以无限扩展,比如即将接入的ES jsf服务、mogodb jsf 服务等。对于每种不同的存储介质,还可以按照业务进行垂直和水平拆分。

 

 

mysql jsf工程(目前1个):mysql主要用于存放数据量不大的汇总数据,以及用户信息数据。目前数据量还比较小,这里只需要一个mysql服务即可满足需求。如果有新业务加入,可以按照对mysql进行垂直拆分,拆分成几个业务独立的数据库,对每个数据库再新建jsf工程,最终形成多个mysql jsf工程集对外提供服务。如下:

 

 
微服务化之----熔断和隔离
            
    
    博客分类: 架构 微服务隔离熔断限流java责任链模式 
 

 

如果某个业务mysql表数据量暴增,再进行水平拆分,根据实际情况拆分为多库多表。借助淘宝开源的mycat、DRDS分库分表中间件也很容易实现。

 

 
微服务化之----熔断和隔离
            
    
    博客分类: 架构 微服务隔离熔断限流java责任链模式 
 

 

hbase jsf工程集(4个):为每个不同的业务申请不同的hbase实例(一个实例可以简单理解成mysql的一个数据库)。这里以实例为单位进行服务化,如果一个实例挂掉,也不会影响到其他实例。做到业务数据完全隔离。

 

其他 jsf工程:ES或mongodb等,待接入。

 

至此,简单的服务化拆分已经完成。下面来看微服务怎么做 隔离、熔断、限流。

 

隔离:

 

按照上述方法进行微服务拆分,已经在不同的数据存储介质和不同业务级别完成了服务隔离。保证其中某个服务坏掉,不会影响到其他服务。

 

利用jsf提供的filter功能可以很方便的为各个子服务做自动熔断、限流功能(主流的RPC框架都具备filter功能,如dubbo)。filter一般都采用“责任链模式”,其实"代理模式"也可以做,但"责任链模式"更方便扩展,不同的任务可以分配到不同的层级的filter。下面主要对自动熔断、限流进行讲解。

 

自动熔断:

 

要做自动熔断,我们首先实现的是统一异常捕获。先来看下我们以前jsf服务工程service实现代码:

 

/**
 * 定义服务接口
 * Created by gantianxing on 2017/5/18.
 */
public interface TestMysqlJsf {
 
    ResultModel getById(int id); //根据用户id获取用户信息
 
    ResultModel getByPage();//分页获取用户信息
}
 
/**
 * 服务实现类
 * Created by gantianxing on 2017/5/18.
 */
public class TestMysqlJsfImpl implements TestMysqlJsf{
    private static final Log log = LogFactory.getLog(TestMysqlJsfImpl.class);
 
    @Override
    public ResultModel getById(int id) {
        ResultModel resultModel = new ResultModel();
 
        try {
            DataUser userinfo = null;
            //省略各种计算代码
            resultModel.addAttribute("userInfo",userinfo);
        } catch (Exception e){
            resultModel.fail("xxxxxxxxxxx异常");
            log.error("xxxxxxxxxxx异常",e);
        }
        return resultModel;
    }
 
    @Override
    public ResultModel getByPage() {
        ResultModel resultModel = new ResultModel();
        try {
            List<DataUser> userList = null;
            //省略各种计算代码
            resultModel.addAttribute("userList",userList);
        } catch (Exception e){
            resultModel.fail("xxxxxxxxxxx异常");
            log.error("xxxxxxxxxxx异常",e);
        }
        return resultModel;
    }
}

 

 

以mysql对应的jsf服务工程为例,在该工程里有无数个类似TestMysqlJsf的服务接口。可以看到实现类TestMysqlJsfImpl里为了保证返回给接口调用方的信息足够友好,每个方法体里都加了try{} catch catch (Exception e),捕获所有的异常(避免抛出给调用方)。

这种重复的try catch 遍布所有的接口实现类的每个方法,费时费力,而且也不雅观。

 

怎么能更优雅的实现呢,这里采用的是在服务入口处统一添加一条filter list。首先来看我们第一个filter,统一异常处理filter:

/**
 * Created by gantianxing on 2017/5/18.
 */
public class HandleExceptionFilter extends AbstractFilter {
    private final static Log log = LogFactory.getLog(HandleExceptionFilter.class);
 
    /**
     * 统一异常处理过滤器
     * @param request jsf接口请求信息
     * @return response 如果出现异常,动态构造错误返回信息。
     */
    @Override
    public ResponseMessage invoke(RequestMessage request) {
        ResponseMessage response = null;
        try {
            response = getNext().invoke(request); // 调用链自动往下层执行,直到真实的服务接口被调用
 
        }catch (Exception e){
            String methodName = request.xxxxx().getMethodName(); //获取调用方法名
            String clazzName = request.xxxxx().getClazzName();//获取调用类名
            response = MessageBuilder.buildResponse(request); // 自己构造返回对象
            ResultModel resultModel = new ResultModel();// 动态构造异常返回
            resultModel.fail("接口调用异常:" + e.getMessage());
            response.setResponse(resultModel);
            log.error("接口调用异常:" + clazzName + ":" + methodName, e);
        }
        return response;
    }
}

 

通过配置保证,这层filter在所有业务方法调用之前,首先被调用,其中的response = getNext().invoke(request) 会根据配置再去调用到真实的接口实现方法。

这样所有的“运行时异常”都可以在统一这个过滤器中处理,所有service实现类都不再需要再添加类似try{} catch catch (Exception e) ("非运行时异常"除外)这样的代码,只需处理"非运行时异常"即可。

 

另外,方法性能监控也可以做到这层filter,统一对每个方法性能监控进行埋点,防止出现整个工程到处都是监控埋点的情况(牵涉公司业务太多,代码里没有体现)。

 

有了HandleExceptionFilter这个统一异常处理过滤器之后,上面的TestMysqlJsfImpl可以改为:

 

/**
 * Created by gantianxing on 2017/5/18.
 */
public class TestMysqlJsfImpl implements TestMysqlJsf{
    private static final Log log = LogFactory.getLog(TestMysqlJsfImpl.class);
 
    @Override
    public ResultModel getById(int id) {
        ResultModel resultModel = new ResultModel();
        DataUser userinfo = null;
        //省略各种计算代码
        resultModel.addAttribute("userInfo",userinfo);
        return resultModel;
    }
 
    @Override
    public ResultModel getByPage() {
        ResultModel resultModel = new ResultModel();
        List<DataUser> userList = null;
        //省略各种计算代码
        resultModel.addAttribute("userList",userList);
        return resultModel;
    }
}

 

 

没有了千篇一律的try catch是不是舒服了很多:-D

 

有人要说了,你这跟“自动熔断”有什么关系。别急,我们先看下,为什么要熔断,无非就是服务内部大范围出现异常时自动断开服务,快速的告诉接口调用方“目前服务不可用”。比如:连接数据库超时,或者服务内部调用其他外部接口调用超时等,当某一类异常达到一定的阀指时进行熔断。

 

我们的做法是,在过滤器filter中捕获这些异常,并对某一类异常进行计数,当异常到达一定阀值修改熔断开关为开启状态。将HandleExceptionFilter改造如下:

 

/**
 * Created by gantianxing on 2017/5/18.
 */
public class HandleExceptionFilter extends AbstractFilter {
    private final static Log log = LogFactory.getLog(HandleExceptionFilter.class);
 
    private static AtomicInteger errorcount = new AtomicInteger(0);//错误计数器
 
    @Resource
    private Redis redis;
 
    public void reLoadCount(){
        errorcount.set(0); //故障解除后,手动置0计数器。
    }
 
    /**
     * 统一异常处理过滤器
     * @param request jsf接口请求信息
     * @return response 如果出现异常,动态构造错误返回信息。
     */
    @Override
    public ResponseMessage invoke(RequestMessage request) {
        ResponseMessage response = null;
        try {
            response = getNext().invoke(request); // 调用链自动往下层执行,直到真实的服务接口被调用
        }catch (Exception e){
 
            if (e instanceof ConnectTimeoutException){
                errorcount.getAndIncrement();//原子 +1
                if(errorcount.get() > 50){ //错误次数大于50 熔断器开启
                    redis.setStr("circuit_breaker","on");
                }
            }
            String methodName = request.xxxxx().getMethodName(); //获取调用方法名
            String clazzName = request.xxxxx().getClazzName();//获取调用类名
            response = MessageBuilder.buildResponse(request); // 自己构造返回对象
            ResultModel resultModel = new ResultModel();// 动态构造异常返回
            resultModel.fail("接口调用异常:" + e.getMessage());
            response.setResponse(resultModel);
            log.error("接口调用异常:" + clazzName + ":" + methodName, e);
        }
        return response;
    }
}

每次失败都对异常计数器+1,每次真实接口调用前 先检查异常次数是否到达阀值。

 

在HandleExceptionFilter这层过滤器中只是把熔断开关开启,我们还需要新建一个熔断过滤器filter 添加到HandleExceptionFilter的上层:取名为CircuitBreakerFilter。 它的主要职责:如果熔断开关已经开启,直接返回错误提示;否则继续调用责任链往下执行HandleExceptionFilter。

 

 

/**
 * Created by gantianxing on 2017/5/18.
 */
public class CircuitBreakerFilter extends AbstractFilter {
    private final static Log log = LogFactory.getLog(HandleExceptionFilter.class);
 
    @Resource
    private Redis redis; //redis服务
 
    /**
     * 熔断 过滤器
     * @param request
     * @return
     */
    @Override
    public ResponseMessage invoke(RequestMessage request) {
        ResponseMessage response = null;
        if ("on".equals(redis.get("circuit_breaker"))){
            ResultModel resultModel = new ResultModel();
            resultModel.fail("请求已熔断,请联系服务通过方");
            response.setResponse(resultModel); // 返回结果
            log.info("请求已熔断");
        }else{
            response = getNext().invoke(request); // 调用链往下层执行:HandleExceptionFilter
        }
        return response;
    }
}

 

至此自动熔断实现已经讲完。这里是全熔断实现,如果要实现半熔断,可以再添加“状态模式”实现。

 

自动限流:

 

采用类似"自动熔断"的做法,也可以实现"自动限流",新建CurrentLimitFilter:

/**
 * Created by gantianxing on 2017/5/19.
 */
public class CurrentLimitFilter extends AbstractFilter {
 
    private final static Log log = LogFactory.getLog(CurrentLimitFilter.class);
 
    private static AtomicInteger count = new AtomicInteger(0);
 
    /**
     * 自动限流器
     * @param request
     * @return
     */
    @Override
    public ResponseMessage invoke(RequestMessage request) {
        ResponseMessage response = null;
 
        if (count.get() > 100) { //最高并发超过100 自动限流
            ResultModel resultModel = new ResultModel();
            resultModel.fail("请求到到上限");
            response = MessageBuilder.buildResponse(request); // 自己构造返回对象
            response.setResponse(resultModel); // 返回结果
            log.info("请求到到上限");
        }else{
            count.getAndIncrement(); //进入方法调用,并发计数器+1
            response = getNext().invoke(request); // 自动往下层执行
            count.getAndDecrement();//结束方法调用,并发计数器-1
            return null;
        }
        return response;
    }
}

 

主要采用AtomicInteger做计数器,当进入方法调用时+1,当结束方法调用时-1. 计数器get()方法获取的值即为 该服务器的并发量,如果并发量超过100(根据自己的业务、服务器性能自己配置),则进行限流。当并发量小于100,又自动恢复正常。我们暂且称之为:“丢弃式限流”。

 

这种限流措施,会对调用方产生一定负面影响。有人说,为什么不使用MQ(消息队列)进行限流,还可以保证数据不丢失。其实这是两种不同的手段,针对不同的业务场景。

 

MQ异步限流:适用于后端逻辑处理业务,无需及时向客户端返回处理结果,允许处理请求暂时积压,延迟处理(重要数据要求必须被处理)。比如 订单积压,一般都是采用MQ。

 

丢弃式限流:适用于需及时返回的前端业务,比如一个状态查询,前端页面要求及时返回查询结果(哪怕是错误的也行),否则页面就会被卡住,是一种大促常用降级手段。当服务端并发到达上限时,及时返回一条提示信息,用户再次刷新页面,有可能会得到正常结果。做到保护服务端的同时(预防调用链“雪崩”),让前端也能及时的得到响应。

 

采用“丢弃式限流”、或者直接熔断,可以避免“雪崩”问题。

 
微服务化之----熔断和隔离
            
    
    博客分类: 架构 微服务隔离熔断限流java责任链模式 
 

 

最后 再把三个过滤器串联起来(注意顺序),限流过滤器放在调用链第一层,熔断过滤器放在第二层,统一异常处理过滤器放在第三层。最终调用流程如下:

 

 
微服务化之----熔断和隔离
            
    
    博客分类: 架构 微服务隔离熔断限流java责任链模式 
 

简单总结下:

隔离:要做隔离,一种好的方法就是微服务拆分,让每个小业务成为孤岛,即便是其中一个业务挂掉,其他服务依然可以正常运行。

熔断、限流:在微服务化的基础上可以很方便的做熔断和限流。采用“责任链模式”在服务入口处进行统一封装即可。如果你采用的RPC框架不原生支持filter,可以自己实现一个“责任链模式”融合进去。

扩容:在微服务化后,只需对压力大的子服务进行针对性扩容;对重要的服务数据采用主从备份。总之,可以对不同的自服务灵活的采用不同的灾备手段。

 

最后说下,微服务的缺点:

1、不方便联调测试,需要启动多个服务。解决办法,在开发环境部署一整套服务,开发本机只启动一个正在开发的服务与之进行联调。

2、不方便事务控制,各个子服务构成了一个分布式环境,在需要事务的地方,必须做分布式事务控制。解决办法,对于mysql等关系型分库分布数据库,可以采用mycat等中间件;对于跨服务的,可以采用MQ的事务机制;其他办法,如日志+人工干预等。总之:分布式事务根据业务场景做到“最终一致性”即可。

 

 

以后再总结下分布式事务,这次就到这里吧。