欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

动态路由

程序员文章站 2022-03-03 20:45:49
...

最近基于项目要求,使用了nacos配置网关实现动态路由,记录一下实现步骤、如下:

1、路由接口实现类:

package com.exp.gateway.route;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.event.RefreshRoutesEvent;
import org.springframework.cloud.gateway.route.RouteDefinition;
import org.springframework.cloud.gateway.route.RouteDefinitionWriter;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

/**
 * @创建人 yfc
 * @创建时间 2020-12-07 14:56
 * @描述
 */
@Service
public class DynamicRouteServiceImpl implements ApplicationEventPublisherAware {
	@Autowired
	private RouteDefinitionWriter routeDefinitionWriter;
	private static final Logger log = LoggerFactory.getLogger(DynamicRouteServiceImplByNacos.class);

	/**
	 * 发布事件
	 */
	@Autowired
	private ApplicationEventPublisher publisher;

	@Override
	public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
		this.publisher = applicationEventPublisher;
	}

	/**
	 * 删除路由
	 * @param id
	 * @return
	 */
	public String delete(String id) {
		try {
			log.info("gateway delete route id {}",id);
			this.routeDefinitionWriter.delete(Mono.just(id));
			return "delete success";
		} catch (Exception e) {
			return "delete fail";
		}
	}
	/**
	 * 更新路由
	 * @param definition
	 * @return
	 */
	public String update(RouteDefinition definition) {
		try {
			log.info("gateway update route {}",definition);
			this.routeDefinitionWriter.delete(Mono.just(definition.getId()));
		} catch (Exception e) {
			return "update fail,not find route  routeId: "+definition.getId();
		}
		try {
			routeDefinitionWriter.save(Mono.just(definition)).subscribe();
			this.publisher.publishEvent(new RefreshRoutesEvent(this));
			return "success";
		} catch (Exception e) {
			return "update route fail";
		}
	}

	/**
	 * 增加路由
	 * @param definition
	 * @return
	 */
	public String add(RouteDefinition definition) {
		try{
			log.info("gateway add route {}",definition);
			routeDefinitionWriter.save(Mono.just(definition)).subscribe();
			this.publisher.publishEvent(new RefreshRoutesEvent(this));
		}catch (Exception e){
			e.printStackTrace();
		}
		return "success";
	}
}

2、动态路由获取nacos配置:

package com.exp.gateway.route;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.egrid.cache.jedis.cache.RedisCache;
import com.egrid.core.util.StringUtil;
import com.exp.gateway.config.GatewayConfig;
import com.exp.gateway.filter.AccessFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.gateway.handler.predicate.PredicateDefinition;
import org.springframework.cloud.gateway.route.RouteDefinition;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import org.springframework.web.util.UriBuilder;
import org.yaml.snakeyaml.Yaml;

import javax.annotation.PostConstruct;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;

/**
 * @创建人 yfc
 * @创建时间 2020-12-07 14:53
 * @描述 通过nacos下发动态路由配置,监听nacos中的gateway-service-gateway配置
 */
@Component
@DependsOn({"gatewayConfig"})
public class DynamicRouteServiceImplByNacos {
	@Autowired
	private DynamicRouteServiceImpl dynamicRouteService;
	@Value("${router.id:huike-gateway-dispose}")
	private String id;
	@Value("${router.order:0}")
	private int order;
	@Value("${router.uri:https://www.gogohk.com/}")
	private String uri;
	@Value("${router.predicates}")
	private String predicates;
	@Value("${auth.excludeAuthenticateUrl}")
	private String excludeAuthenticateUrl;
	@Value("${auth.excludeAuthorizedUrl}")
	private String excludeAuthorizedUrl;
	@Autowired
	private RedisCache redisCache;
	private static final Logger log = LoggerFactory.getLogger(DynamicRouteServiceImplByNacos.class);

	@Autowired
	private GatewayConfig gatewayConfig;

	private ConfigService configService;
	@Autowired
	private AccessFilter accessFilter;

	@PostConstruct
	public void init() {
		log.info("gateway route init...");
		try{
			configService = initConfigService();
			if(configService == null){
				log.warn("initConfigService fail");
				return;
			}
			String configInfo = configService.getConfig(GatewayConfig.NACOS_ROUTE_DATA_ID, GatewayConfig.NACOS_ROUTE_GROUP, GatewayConfig.DEFAULT_TIMEOUT);
			if(StringUtil.isEmpty(configInfo)){
				return;
			}
			log.info("获取网关当前配置:\r\n{}",configInfo);
			Yaml yaml=new Yaml();
			Map<String,Object> map= (Map<String, Object>) yaml.load(configInfo);
			JSONObject jsonObject=new JSONObject(map);
			JSONObject auth=jsonObject.getJSONObject("auth");

			RouteDefinition definition=new RouteDefinition();
			URI routerUri =URI.create (uri);
			definition.setUri(routerUri);
			definition.setId(id);
			definition.setOrder(order);
			PredicateDefinition predicateDefinition=new PredicateDefinition(predicates);
			List<PredicateDefinition> predicateDefinitionList=new ArrayList<>();
			predicateDefinitionList.add(predicateDefinition);
			definition.setPredicates(predicateDefinitionList);
			log.info("update route : {}",auth.toString());
			dynamicRouteService.add(definition);

			accessFilter.setExcludeAuthenticateUrl(auth.get("excludeAuthenticateUrl").toString());
			accessFilter.setExcludeAuthorizedUrl(auth.get("excludeAuthorizedUrl").toString());
		} catch (Exception e) {
			log.error("初始化网关路由时发生错误",e);
		}
		dynamicRouteByNacosListener(GatewayConfig.NACOS_ROUTE_DATA_ID,GatewayConfig.NACOS_ROUTE_GROUP);
	}

	/**
	 * 监听Nacos下发的动态路由配置
	 * @param dataId
	 * @param group
	 */
	public void dynamicRouteByNacosListener (String dataId, String group){
		try {
			configService.addListener(dataId, group, new Listener()  {
				@Override
				public void receiveConfigInfo(String configInfo) {
					log.info("进行网关更新:\n\r{}",configInfo);
					if(StringUtil.isEmpty(configInfo)){
						return;
					}
					Yaml yaml=new Yaml();
					Map<String,Object> map= (Map<String, Object>) yaml.load(configInfo);
					JSONObject jsonObject=new JSONObject(map);
					JSONObject auth=jsonObject.getJSONObject("auth");
					accessFilter.setExcludeAuthenticateUrl(auth.get("excludeAuthenticateUrl").toString());
					accessFilter.setExcludeAuthorizedUrl(auth.get("excludeAuthorizedUrl").toString());
					RouteDefinition definition=new RouteDefinition();
					URI routerUri =URI.create (uri);
					definition.setUri(routerUri);
					definition.setId(id);
					definition.setOrder(order);
					PredicateDefinition predicateDefinition=new PredicateDefinition(predicates);
					List<PredicateDefinition> predicateDefinitionList=new ArrayList<>();
					predicateDefinitionList.add(predicateDefinition);
					definition.setPredicates(predicateDefinitionList);
					dynamicRouteService.update(definition);
				}
				@Override
				public Executor getExecutor() {
					log.info("getExecutor\n\r");
					return null;
				}
			});
		} catch (NacosException e) {
			log.error("从nacos接收动态路由配置出错!!!",e);
		}
	}

	/**
	 * 初始化网关路由 nacos config
	 * @return
	 */
	private ConfigService initConfigService(){
		try{
			Properties properties = new Properties();
			properties.setProperty("serverAddr", GatewayConfig.NACOS_SERVER_ADDR);
			properties.setProperty("namespace",GatewayConfig.NACOS_NAMESPACE);
			return configService= NacosFactory.createConfigService(properties);
		} catch (Exception e) {
			log.error("初始化网关路由时发生错误",e);
			return null;
		}
	}

}

3、nacos路由配置

package com.exp.gateway.route;

import com.alibaba.cloud.nacos.NacosConfigProperties;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.egrid.core.util.StringUtil;
import com.exp.gateway.config.DynamicRouteConfig.GatewayDynamicProperties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.gateway.event.RefreshRoutesEvent;
import org.springframework.cloud.gateway.filter.FilterDefinition;
import org.springframework.cloud.gateway.route.RouteDefinition;
import org.springframework.cloud.gateway.route.RouteDefinitionRepository;
import org.springframework.context.ApplicationEventPublisher;
import org.yaml.snakeyaml.Yaml;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.net.URISyntaxException;
import java.util.*;
import java.util.concurrent.Executor;

public class NacosRouteDefinitionRepository implements RouteDefinitionRepository {
    private final Map<String, RouteDefinition> routes = Collections.synchronizedMap(new LinkedHashMap());
    private static final Logger LOGGER = LoggerFactory.getLogger(NacosRouteDefinitionRepository.class);
    private static final String JSON = "json";
    private static final String YAML = "yaml";
    private static final String YML = "yml";
    
    private GatewayDynamicProperties properties;
    
    private ApplicationEventPublisher publisher;

    private NacosConfigProperties nacosConfigProperties;
    
    public NacosRouteDefinitionRepository(GatewayDynamicProperties properties, ApplicationEventPublisher publisher, NacosConfigProperties nacosConfigProperties) {
        this.publisher = publisher;
        this.nacosConfigProperties = nacosConfigProperties;
        this.properties = properties;
        addListener();
    }

    @Override
    public Flux<RouteDefinition> getRouteDefinitions() {
        try {
            if (properties.isOpen) {
                String content = nacosConfigProperties.configServiceInstance().getConfig(properties.ROUTE_DATA_ID, properties.ROUTE_GROUP_ID,5000);
                List<RouteDefinition> routeDefinitions = getListByStr(content);
                return Flux.fromIterable(routeDefinitions);
            }
        } catch (Exception e) {
            LOGGER.error("getRouteDefinitions by nacos error", e);
        }
        return Flux.fromIterable(new ArrayList<>());
    }

    /**
     * 添加Nacos监听
     */
    private void addListener() {
        try {
            nacosConfigProperties.configServiceInstance().addListener(properties.ROUTE_DATA_ID, properties.ROUTE_GROUP_ID, new Listener() {
                @Override
                public Executor getExecutor() {
                    return null;
                }

                @Override
                public void receiveConfigInfo(String configInfo) {
                    publisher.publishEvent(new RefreshRoutesEvent(this));
                }
            });
        } catch (NacosException e) {
            LOGGER.error("nacos-addListener-error", e);
        }
    }

    @Override
    public Mono<Void> save(Mono<RouteDefinition> route) {
        return route.flatMap((r) -> {
            this.routes.put(r.getId(), r);
            return Mono.empty();
        });
    }

    @Override
    public Mono<Void> delete(Mono<String> routeId) {
        return null;
    }

    @SuppressWarnings({ "unchecked", "rawtypes" })
    private List<RouteDefinition> getListByStr(String content) throws URISyntaxException {
        if (properties.ROUTE_FILE_EXTENSION.equalsIgnoreCase(JSON)) {
            if (!StringUtil.isEmpty(content)) {
                return JSONObject.parseArray(content, RouteDefinition.class);
            }
        } else if (properties.ROUTE_FILE_EXTENSION.equalsIgnoreCase(YAML) || properties.ROUTE_FILE_EXTENSION.equalsIgnoreCase(YML)) {
            if (!StringUtil.isEmpty(content)) {
                List<RouteDefinition> listRoute = new ArrayList<>();
                Yaml yaml = new Yaml();
                Map map = yaml.load(content);
                List<Map> tmpRoutes = (List<Map>)((Map)((Map)((Map)map.get("spring")).get("cloud")).get("gateway")).get("routes");
                for(Map tmpRoute : tmpRoutes) {
                    StringBuffer strRoute = new StringBuffer();
                    if (tmpRoute.containsKey("id")) {
                        strRoute.append(tmpRoute.get("id")).append("=");
                    } else {
                        strRoute.append(UUID.randomUUID().toString()).append("=");
                    }
                    strRoute.append(tmpRoute.get("uri")).append(",");
                    List<String> tmpPredicates = (List<String>)tmpRoute.get("predicates");
                    for (String tmpPredicate : tmpPredicates) {
                        strRoute.append(tmpPredicate).append(",");
                    }
                    RouteDefinition route = new RouteDefinition(strRoute.toString().substring(0, strRoute.length() - 1));
                    
                    List<String> tmpFilters = (List<String>)tmpRoute.get("filters");
                    if (tmpFilters != null && tmpFilters.size() > 0) {
                        List<FilterDefinition> filters = new ArrayList<>();
                        for (String tmpFilter : tmpFilters) {
                            filters.add(new FilterDefinition(tmpFilter));
                        }
                        route.setFilters(filters);
                    }
                    listRoute.add(route);
                }
                return listRoute;
            }
        }
        
        return new ArrayList<>(0);
    }
}

4、过滤器

package com.exp.gateway.filter;

import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import com.egrid.core.util.ScheduledExecutor;
import com.exp.gateway.config.GatewayConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.StringUtils;
import org.springframework.web.server.ServerWebExchange;

import com.egrid.cache.jedis.cache.RedisCache;
import com.egrid.core.threadlocal.pool.TtlExecutors;
import com.egrid.core.util.JsonBeanUtil;
import com.egrid.core.util.StringUtil;
import com.egrid.core.web.response.RestResponse;
import com.egrid.core.web.response.RestStatusCode;
import com.exp.gateway.consumer.AuthConsumer;
import com.exp.gateway.consumer.BehaviorConsumer;
import com.exp.gateway.interceptor.FeignInterceptor;
import com.netflix.client.ClientFactory;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.IClientConfig;
import com.netflix.client.config.IClientConfigKey;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.loadbalancer.RandomRule;
import com.netflix.loadbalancer.ZoneAwareLoadBalancer;

import feign.Feign;
import feign.form.spring.SpringFormEncoder;
import feign.jackson.JacksonDecoder;
import feign.jackson.JacksonEncoder;
import feign.ribbon.LBClient;
import feign.ribbon.LBClientFactory;
import feign.ribbon.RibbonClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class AccessFilter implements GlobalFilter, Ordered {
	private static final Logger LOGGER = LoggerFactory.getLogger(AccessFilter.class);

	@Autowired
	private RedisCache shiroRedisCache;

	@Autowired
	private DiscoveryClient discoveryClient;

	@Value("${behavior.service}")
	private String behaviorService;

	@Value("${auth.service:service-auth}")
	private String authService;

	@Value("${auth.headers:cmf-auth-token}")
	private String saveToHeader;

	@Value("${auth.userKey:x-auth-user}")
	private String userKey;

	@Value("${auth.filterUrlStorage:authorizedUrl}")
	private String filterUrlStorage;

	@Value("${auth.excludeAuthenticateUrl}")
	private String excludeAuthenticateUrl;
	@Value("${auth.excludeAuthorizedUrl}")
	private String excludeAuthorizedUrl;

	@Override
	public int getOrder() {
		return 0;
	}

	@Override
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
		Map<String, Object> user = null;
		String authorizedUrl = getPathWithinApplication(exchange);
		LOGGER.info(String.format("%s AccessFilter request to %s, authorized url is %s",
				exchange.getRequest().getMethod(), exchange.getRequest().getURI(), authorizedUrl));
		try {
			if (!shouldAuthenticateFilter(authorizedUrl)) {
				String token = exchange.getRequest().getHeaders().getFirst(saveToHeader);
				String strUser = (String)shiroRedisCache.get(userKey + ":" + token);
				if (StringUtil.isEmpty(strUser)) {
					return chain.filter(exchange);
				} else {
					try {
						user = JsonBeanUtil.jsonToBean(strUser, HashMap.class);
						user.put("realIp", getRequestIp(exchange.getRequest()));
						ServerHttpRequest request = addHeader(exchange.getRequest(), URLEncoder.encode(JsonBeanUtil.beanToJson(user), "UTF-8"));
						return chain.filter(exchange.mutate().request(request).build());
					} catch (UnsupportedEncodingException e) {
						return badResponse(exchange, HttpStatus.BAD_REQUEST, RestStatusCode.SERVER_UNKNOWN_ERROR,
								new Exception(RestStatusCode.SERVER_UNKNOWN_ERROR.message()));
					} finally {
						shiroRedisCache.expire(userKey + ":" + token, TimeUnit.SECONDS, shiroRedisCache.getExpireSeconds());
						shiroRedisCache.expire(token, TimeUnit.SECONDS, shiroRedisCache.getExpireSeconds());
					}
				}
			} else if (!shouldAuthorizedFilter(authorizedUrl)) {
				String token = exchange.getRequest().getHeaders().getFirst(saveToHeader);
				String strUser = (String)shiroRedisCache.get(userKey + ":" + token);
				if (StringUtil.isEmpty(strUser)) {
					//免鉴权,表示还是需要认证的,如果缓存里没有,表示需要用户再次登录
					return badResponse(exchange, HttpStatus.UNAUTHORIZED, RestStatusCode.SESSION_TIMEOUT,
							new Exception("登录已失效,请重新登录"));
				} else {
					try {
						user = JsonBeanUtil.jsonToBean(strUser, HashMap.class);
						user.put("realIp", getRequestIp(exchange.getRequest()));
						ServerHttpRequest request = addHeader(exchange.getRequest(), URLEncoder.encode(JsonBeanUtil.beanToJson(user), "UTF-8"));
						return chain.filter(exchange.mutate().request(request).build());
					} catch (UnsupportedEncodingException e) {
						return badResponse(exchange, HttpStatus.BAD_REQUEST, RestStatusCode.SERVER_UNKNOWN_ERROR,
								new Exception(RestStatusCode.SERVER_UNKNOWN_ERROR.message()));
					} finally {
						shiroRedisCache.expire(userKey + ":" + token, TimeUnit.SECONDS, shiroRedisCache.getExpireSeconds());
						shiroRedisCache.expire(token, TimeUnit.SECONDS, shiroRedisCache.getExpireSeconds());
					}
				}
			} else {
				/** 获取service-auth服务在注册中心注册的地址 */
				StringBuffer listOfServers = new StringBuffer();
				List<ServiceInstance> serviceInstances = discoveryClient.getInstances(authService);
				for (ServiceInstance instance : serviceInstances) {
					listOfServers.append(instance.getHost()).append(":").append(instance.getPort()).append(",");
				}

				/** 通过Feign调用远程服务,获取认证信息 */
				RibbonClient client = RibbonClient.builder().lbClientFactory(new LBClientFactory() {
					@Override
					public LBClient create(String clientName) {
						IClientConfig config = ClientFactory.getNamedConfig(clientName);
						IClientConfigKey<String> key = CommonClientConfigKey.ListOfServers;
						config.set(key, listOfServers.substring(0, listOfServers.length() - 1));
						ILoadBalancer lb = ClientFactory.getNamedLoadBalancer(clientName);
						@SuppressWarnings("rawtypes")
						ZoneAwareLoadBalancer zb = (ZoneAwareLoadBalancer) lb;
						zb.setRule(new RandomRule());
						return LBClient.create(lb, config);
					}
				}).build();

				/**
				 * Request里已经存在的Header内容
				 */
				String[] arySaveToHeader;
				if (StringUtil.isEmpty(saveToHeader)) {
					arySaveToHeader = new String[] {};
				} else {
					if (saveToHeader.indexOf(",") > 0) {
						arySaveToHeader = saveToHeader.split(",");
					} else {
						arySaveToHeader = saveToHeader.split(" ");
					}
					for (int i = 0; i < arySaveToHeader.length; i++) {
						arySaveToHeader[i] = arySaveToHeader[i].trim();
					}
				}

				/**
				 * 追加的Header内容
				 */
				Map<String, String> addHeader = new HashMap<>();
				addHeader.put(filterUrlStorage, authorizedUrl);
				AuthConsumer service = Feign.builder().requestInterceptor(new FeignInterceptor(exchange.getRequest(), arySaveToHeader, addHeader))
						.client(client).encoder(new JacksonEncoder()).decoder(new JacksonDecoder())
						.target(AuthConsumer.class, "http://" + authService);
				RestResponse<Map<String, Object>> restResponse = service.isPermitted();

				if (restResponse == null) {
					return badResponse(exchange, HttpStatus.BAD_REQUEST, RestStatusCode.SERVER_UNKNOWN_ERROR,
							new Exception(RestStatusCode.SERVER_UNKNOWN_ERROR.message()));
				} else {
					LOGGER.info(String.format("待鉴权的URL是:%s,该URL鉴权结果是:%s", authorizedUrl, restResponse.getCode()));
					if (restResponse.getCode() == RestStatusCode.OK.code()) {
						try {
							user = restResponse.getResult();
							user.put("realIp", getRequestIp(exchange.getRequest()));
							ServerHttpRequest request = addHeader(exchange.getRequest(), URLEncoder.encode(JsonBeanUtil.beanToJson(user), "UTF-8"));
							return chain.filter(exchange.mutate().request(request).build());
						} catch (UnsupportedEncodingException e) {
							return badResponse(exchange, HttpStatus.BAD_REQUEST, RestStatusCode.SERVER_UNKNOWN_ERROR,
									new Exception(RestStatusCode.SERVER_UNKNOWN_ERROR.message()));
						}
					}
					if (restResponse.getCode() == RestStatusCode.UNAUTHENTICED.code()) {
						return badResponse(exchange, HttpStatus.UNAUTHORIZED, RestStatusCode.UNAUTHENTICED,
								new Exception("尚未登录,请登录"));
					}
					if (restResponse.getCode() == RestStatusCode.SESSION_TIMEOUT.code()) {
						return badResponse(exchange, HttpStatus.UNAUTHORIZED, RestStatusCode.SESSION_TIMEOUT,
								new Exception("登录已失效,请重新登录"));
					}
					if (restResponse.getCode() == RestStatusCode.UNAUTHORIZED.code()) {
						return badResponse(exchange, HttpStatus.UNAUTHORIZED, RestStatusCode.UNAUTHORIZED,
								new Exception("没有权限操作该功能,请确认"));
					}
					if (restResponse.getCode() == RestStatusCode.SERVER_UNKNOWN_ERROR.code()) {
						return badResponse(exchange, HttpStatus.BAD_REQUEST, RestStatusCode.SERVER_UNKNOWN_ERROR,
								new Exception(RestStatusCode.SERVER_UNKNOWN_ERROR.message()));
					} else {
						return badResponse(exchange, HttpStatus.BAD_REQUEST, RestStatusCode.SERVER_UNKNOWN_ERROR,
								new Exception(RestStatusCode.SERVER_UNKNOWN_ERROR.message()));
					}
				}
			}
		} finally {
			if (!StringUtil.isEmpty(behaviorService)) {
				if (user != null && user.containsKey("factory") && user.containsKey("brand") && user.containsKey("partyId")) {
					String queryParams = null;
					String bodyParams = null;
					try {
						if (HttpMethod.GET.name().equalsIgnoreCase(exchange.getRequest().getMethodValue())) {
							queryParams = exchange.getRequest().getQueryParams().toString();
							bodyParams = "";
						} else if (HttpMethod.POST.name().equalsIgnoreCase(exchange.getRequest().getMethodValue())
								&& exchange.getRequest().getHeaders().getContentType() != null
								&& !exchange.getRequest().getHeaders().getContentType().equals(MediaType.MULTIPART_FORM_DATA)) {
							queryParams = exchange.getRequest().getQueryParams().toString();
							bodyParams = resolveBodyFromRequest(exchange.getRequest());
						} else {
							queryParams = "";
							bodyParams = "";
						}
					} catch (Exception ex) {
						queryParams = "";
						bodyParams = "";
					}
					ScheduledExecutor.getScheduledExecutor().execute(new ThreadBehavior(discoveryClient, behaviorService, (String)user.get("factory"),
							(String)user.get("brand"), (String)user.get("partyId"), authorizedUrl, queryParams, bodyParams));
				}
			}
		}
	}

	static class ThreadBehavior implements Runnable{
		private DiscoveryClient discoveryClient;
		private String behaviorService;
		private String factoryId;
		private String brandId;
		private String userId;
		private String behaviorTargetUrl;
		private String queryParams;
		private String bodyParams;
		ThreadBehavior(DiscoveryClient discoveryClient, String behaviorService, String factoryId, String brandId, String userId, String behaviorTargetUrl, String queryParams, String bodyParams){
			this.discoveryClient = discoveryClient;
			this.behaviorService = behaviorService;
			this.factoryId = factoryId;
			this.brandId = brandId;
			this.userId = userId;
			this.behaviorTargetUrl = behaviorTargetUrl;
			this.queryParams = queryParams;
			this.bodyParams = bodyParams;
		}
		@Override
		public void run() {
			// 用户行为记录
			try {
				StringBuffer listOfServers = new StringBuffer();
				List<ServiceInstance> serviceInstances = discoveryClient.getInstances(behaviorService);
				for (ServiceInstance instance : serviceInstances) {
					listOfServers.append(instance.getHost()).append(":").append(instance.getPort()).append(",");
				}

				/** 通过Feign调用远程服务,获取认证信息 */
				RibbonClient client = RibbonClient.builder().lbClientFactory(new LBClientFactory() {
					@Override
					public LBClient create(String clientName) {
						IClientConfig config = ClientFactory.getNamedConfig(clientName);
						IClientConfigKey<String> key = CommonClientConfigKey.ListOfServers;
						config.set(key, listOfServers.substring(0, listOfServers.length() - 1));
						ILoadBalancer lb = ClientFactory.getNamedLoadBalancer(clientName);
						@SuppressWarnings("rawtypes")
						ZoneAwareLoadBalancer zb = (ZoneAwareLoadBalancer) lb;
						zb.setRule(new RandomRule());
						return LBClient.create(lb, config);
					}
				}).build();

				BehaviorConsumer service = Feign.builder()
						.client(client).encoder(new SpringFormEncoder()).decoder(new JacksonDecoder())
						.target(BehaviorConsumer.class, "http://" + behaviorService);
				Map<String, Object> params = new HashMap<>();
				params.put("factoryId", factoryId);
				params.put("brandId", brandId);
				params.put("partyId", Long.parseLong(userId));
				params.put("behaviorType", "01");
				params.put("behaviorTarget", "system");
				params.put("behaviorTargetUrl", behaviorTargetUrl);
				params.put("queryParams", queryParams);
				params.put("bodyParams", bodyParams);

				service.record(params);
			} catch (Exception ex) {
				LOGGER.error("用户行为记录出现异常,异常情况:{}" , ex);
			}
		}
	}

	private Mono<Void> badResponse (ServerWebExchange exchange, HttpStatus status, RestStatusCode restStatusCode, Exception exception) {
		ServerHttpResponse response = exchange.getResponse();
		RestResponse<String> restResponseCtx = new RestResponse<>(restStatusCode, exception);
		byte[] bits = JsonBeanUtil.beanToJson(restResponseCtx).getBytes(StandardCharsets.UTF_8);
		DataBuffer buffer = response.bufferFactory().wrap(bits);
		response.setStatusCode(status);
		//指定编码,否则在浏览器中会中文乱码
		response.getHeaders().add("Content-Type", "text/plain;charset=UTF-8");
		return response.writeWith(Mono.just(buffer));
	}


	/*
	 * <p>*Description:增加header内容*</p>**
	 * @param ctx 上下文*
	 * @param request 请求
	 */

	private ServerHttpRequest addHeader(ServerHttpRequest request, String user) {
		return request.mutate().header(userKey, new String[]{encodeHeadInfo(user)}).build();
	}

	private String encodeHeadInfo( String headInfo ) {
		StringBuffer stringBuffer = new StringBuffer();
		for (int i = 0, length = headInfo.length(); i < length; i++) {
			char c = headInfo.charAt(i);
			if (c <= '\u001f' || c >= '\u007f') {
				stringBuffer.append( String.format ("\\u%04x", (int)c) );
			} else {
				stringBuffer.append(c);
			}
		}
		return stringBuffer.toString();
	}

	/**
	 * <p>
	 * Field pathMatcher: 匹配工具类
	 * </p>
	 */
	private final AntPathMatcher pathMatcher = new AntPathMatcher();

	private boolean shouldAuthenticateFilter(String authorizedUrl) {
		if (StringUtil.isEmpty(excludeAuthenticateUrl)) {
			return true;
		} else {
			String[] aryExcludeUrl;
			if (excludeAuthenticateUrl.indexOf(",") > 0) {
				aryExcludeUrl = excludeAuthenticateUrl.split(",");
			} else {
				aryExcludeUrl = excludeAuthenticateUrl.split(" ");
			}
			for (int i = 0; i < aryExcludeUrl.length; i++) {
				//如果匹配到url则不再过滤
				if (pathMatcher.match(aryExcludeUrl[i].trim(), authorizedUrl)) {
					return false;
				}
			}
			return true;// 是否执行该过滤器,此处为true,说明需要过滤
		}
	}

	private boolean shouldAuthorizedFilter(String authorizedUrl) {
		if (StringUtil.isEmpty(excludeAuthorizedUrl)) {
			return true;
		} else {
			String[] aryExcludeUrl;
			if (excludeAuthorizedUrl.indexOf(",") > 0) {
				aryExcludeUrl = excludeAuthorizedUrl.split(",");
			} else {
				aryExcludeUrl = excludeAuthorizedUrl.split(" ");
			}
			for (int i = 0; i < aryExcludeUrl.length; i++) {
				//如果匹配到url则不再过滤
				if (pathMatcher.match(aryExcludeUrl[i].trim(), authorizedUrl)) {
					return false;
				}
			}
			return true;// 是否执行该过滤器,此处为true,说明需要过滤
		}
	}

	/**
	 * <p>
	 * Description: 获取请求ip 暂时用不到
	 * </p>
	 *
	 * @param request 请求
	 * @return
	 */
	private String getRequestIp(ServerHttpRequest request) {
		String ip = request.getHeaders().getFirst("X-Forwarded-For");
		if (!StringUtils.isEmpty(ip) && !"unKnown".equalsIgnoreCase(ip)) {
			//多次反向代理后会有多个ip值,第一个ip才是真实ip
			int index = ip.indexOf(",");
			if (index != -1) {
				return ip.substring(0, index);
			} else {
				return ip;
			}
		}
		ip = request.getHeaders().getFirst("X-Real-IP");
		if (!StringUtils.isEmpty(ip) && !"unKnown".equalsIgnoreCase(ip)) {
			return ip;
		}
		String uri = request.getURI().toString();

		return uri.substring(0, uri.indexOf(request.getPath().toString()));
	}

	private static String getPathWithinApplication(ServerWebExchange exchange) {
		String path = normalize(exchange.getRequest().getPath().toString());
		return (StringUtils.hasText(path) ? path : "/");
	}

	private static String normalize(String path) {
		return normalize(path, true);
	}

	/**
	 * Normalize a relative URI path that may have relative values ("/./",
	 * "/../", and so on ) it it.  <strong>WARNING</strong> - This method is
	 * useful only for normalizing application-generated paths.  It does not
	 * try to perform security checks for malicious input.
	 * Normalize operations were was happily taken from org.apache.catalina.util.RequestUtil in
	 * Tomcat trunk, r939305
	 *
	 * @param path             Relative path to be normalized
	 * @param replaceBackSlash Should '\\' be replaced with '/'
	 * @return normalized path
	 */
	private static String normalize(String path, boolean replaceBackSlash) {

		if (path == null)
			return null;

		// Create a place for the normalized path
		String normalized = path;

		if (replaceBackSlash && normalized.indexOf('\\') >= 0)
			normalized = normalized.replace('\\', '/');

		if (normalized.equals("/."))
			return "/";

		// Add a leading "/" if necessary
		if (!normalized.startsWith("/"))
			normalized = "/" + normalized;

		// Resolve occurrences of "//" in the normalized path
		while (true) {
			int index = normalized.indexOf("//");
			if (index < 0)
				break;
			normalized = normalized.substring(0, index) +
					normalized.substring(index + 1);
		}

		// Resolve occurrences of "/./" in the normalized path
		while (true) {
			int index = normalized.indexOf("/./");
			if (index < 0)
				break;
			normalized = normalized.substring(0, index) +
					normalized.substring(index + 2);
		}

		// Resolve occurrences of "/../" in the normalized path
		while (true) {
			int index = normalized.indexOf("/../");
			if (index < 0)
				break;
			if (index == 0)
				return (null);  // Trying to go outside our context
			int index2 = normalized.lastIndexOf('/', index - 1);
			normalized = normalized.substring(0, index2) +
					normalized.substring(index + 3);
		}

		// Return the normalized path that we have completed
		return (normalized);
	}

	private String resolveBodyFromRequest(ServerHttpRequest serverHttpRequest) {
		//获取请求体
		Flux<DataBuffer> body = serverHttpRequest.getBody();
		StringBuilder sb = new StringBuilder();
		body.subscribe(buffer -> {
			byte[] bytes = new byte[buffer.readableByteCount()];
			buffer.read(bytes);
			sb.append(new String(bytes, StandardCharsets.UTF_8));
			// 创建ByteBuf对象后,它的引用计数是1,当你每次调用DataBufferUtils.release之后会释放引用计数对象时,它的引用计数减1,如果引用计数为0,这个引用计数对象会被释放(deallocate),并返回对象池。
			// 使用相同的方法来获取body数据,超过2次就会提示refCnt: 0,如果想避免该问题可以不进行release(注释DataBufferUtils.release(buffer))
			DataBufferUtils.release(buffer);
		});
		return sb.toString();
	}

	static class ScheduledExecutor {

		private static class LazyHolder {
			private static Thread _ShutdownThread;
			static ScheduledExecutorService _ScheduledExecutor = null;
			static {
				_ScheduledExecutor = TtlExecutors.getTtlScheduledExecutorService(
						new ScheduledThreadPoolExecutor(20, new CmfaThreadFactory("GatewayExecutor")));
				_ShutdownThread = new Thread(new Runnable() {
					public void run() {
						shutdownExecutorPool();
					}
				});
				Runtime.getRuntime().addShutdownHook(_ShutdownThread);
			}

			private static void shutdownExecutorPool() {
				if (_ScheduledExecutor != null) {
					_ScheduledExecutor.shutdown();
					if (_ShutdownThread != null) {
						try {
							Runtime.getRuntime().removeShutdownHook(_ShutdownThread);
						} catch (IllegalStateException ise) {
						}
					}
				}
			}
		}

		static ScheduledExecutorService getScheduledExecutor() {
			return LazyHolder._ScheduledExecutor;
		}
	}

	static class CmfaThreadFactory implements ThreadFactory {
		private static final AtomicInteger poolNumber = new AtomicInteger(1);
		private final ThreadGroup group;
		private final AtomicInteger threadNumber = new AtomicInteger(1);
		private final String namePrefix;

		CmfaThreadFactory(String threadName) {
			SecurityManager s = System.getSecurityManager();
			group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
			namePrefix = "BehaviorPool-" + poolNumber.getAndIncrement() + "-" + threadName + "-";
		}

		public Thread newThread(Runnable r) {
			Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
			if (t.isDaemon())
				t.setDaemon(false);
			if (t.getPriority() != Thread.NORM_PRIORITY)
				t.setPriority(Thread.NORM_PRIORITY);
			return t;
		}
	}

	public String getExcludeAuthenticateUrl() {
		return excludeAuthenticateUrl;
	}

	public void setExcludeAuthenticateUrl(String excludeAuthenticateUrl) {
		this.excludeAuthenticateUrl = excludeAuthenticateUrl;
	}

	public String getExcludeAuthorizedUrl() {
		return excludeAuthorizedUrl;
	}

	public void setExcludeAuthorizedUrl(String excludeAuthorizedUrl) {
		this.excludeAuthorizedUrl = excludeAuthorizedUrl;
	}
}

以上对应的nacos配置没有加

相关标签: 网关