动态路由
程序员文章站
2022-03-03 20:45:49
...
最近基于项目要求,使用了nacos配置网关实现动态路由,记录一下实现步骤、如下:
1、路由接口实现类:
package com.exp.gateway.route;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.event.RefreshRoutesEvent;
import org.springframework.cloud.gateway.route.RouteDefinition;
import org.springframework.cloud.gateway.route.RouteDefinitionWriter;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
/**
* @创建人 yfc
* @创建时间 2020-12-07 14:56
* @描述
*/
@Service
public class DynamicRouteServiceImpl implements ApplicationEventPublisherAware {
@Autowired
private RouteDefinitionWriter routeDefinitionWriter;
private static final Logger log = LoggerFactory.getLogger(DynamicRouteServiceImplByNacos.class);
/**
* 发布事件
*/
@Autowired
private ApplicationEventPublisher publisher;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
/**
* 删除路由
* @param id
* @return
*/
public String delete(String id) {
try {
log.info("gateway delete route id {}",id);
this.routeDefinitionWriter.delete(Mono.just(id));
return "delete success";
} catch (Exception e) {
return "delete fail";
}
}
/**
* 更新路由
* @param definition
* @return
*/
public String update(RouteDefinition definition) {
try {
log.info("gateway update route {}",definition);
this.routeDefinitionWriter.delete(Mono.just(definition.getId()));
} catch (Exception e) {
return "update fail,not find route routeId: "+definition.getId();
}
try {
routeDefinitionWriter.save(Mono.just(definition)).subscribe();
this.publisher.publishEvent(new RefreshRoutesEvent(this));
return "success";
} catch (Exception e) {
return "update route fail";
}
}
/**
* 增加路由
* @param definition
* @return
*/
public String add(RouteDefinition definition) {
try{
log.info("gateway add route {}",definition);
routeDefinitionWriter.save(Mono.just(definition)).subscribe();
this.publisher.publishEvent(new RefreshRoutesEvent(this));
}catch (Exception e){
e.printStackTrace();
}
return "success";
}
}
2、动态路由获取nacos配置:
package com.exp.gateway.route;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.egrid.cache.jedis.cache.RedisCache;
import com.egrid.core.util.StringUtil;
import com.exp.gateway.config.GatewayConfig;
import com.exp.gateway.filter.AccessFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.gateway.handler.predicate.PredicateDefinition;
import org.springframework.cloud.gateway.route.RouteDefinition;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import org.springframework.web.util.UriBuilder;
import org.yaml.snakeyaml.Yaml;
import javax.annotation.PostConstruct;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
/**
* @创建人 yfc
* @创建时间 2020-12-07 14:53
* @描述 通过nacos下发动态路由配置,监听nacos中的gateway-service-gateway配置
*/
@Component
@DependsOn({"gatewayConfig"})
public class DynamicRouteServiceImplByNacos {
@Autowired
private DynamicRouteServiceImpl dynamicRouteService;
@Value("${router.id:huike-gateway-dispose}")
private String id;
@Value("${router.order:0}")
private int order;
@Value("${router.uri:https://www.gogohk.com/}")
private String uri;
@Value("${router.predicates}")
private String predicates;
@Value("${auth.excludeAuthenticateUrl}")
private String excludeAuthenticateUrl;
@Value("${auth.excludeAuthorizedUrl}")
private String excludeAuthorizedUrl;
@Autowired
private RedisCache redisCache;
private static final Logger log = LoggerFactory.getLogger(DynamicRouteServiceImplByNacos.class);
@Autowired
private GatewayConfig gatewayConfig;
private ConfigService configService;
@Autowired
private AccessFilter accessFilter;
@PostConstruct
public void init() {
log.info("gateway route init...");
try{
configService = initConfigService();
if(configService == null){
log.warn("initConfigService fail");
return;
}
String configInfo = configService.getConfig(GatewayConfig.NACOS_ROUTE_DATA_ID, GatewayConfig.NACOS_ROUTE_GROUP, GatewayConfig.DEFAULT_TIMEOUT);
if(StringUtil.isEmpty(configInfo)){
return;
}
log.info("获取网关当前配置:\r\n{}",configInfo);
Yaml yaml=new Yaml();
Map<String,Object> map= (Map<String, Object>) yaml.load(configInfo);
JSONObject jsonObject=new JSONObject(map);
JSONObject auth=jsonObject.getJSONObject("auth");
RouteDefinition definition=new RouteDefinition();
URI routerUri =URI.create (uri);
definition.setUri(routerUri);
definition.setId(id);
definition.setOrder(order);
PredicateDefinition predicateDefinition=new PredicateDefinition(predicates);
List<PredicateDefinition> predicateDefinitionList=new ArrayList<>();
predicateDefinitionList.add(predicateDefinition);
definition.setPredicates(predicateDefinitionList);
log.info("update route : {}",auth.toString());
dynamicRouteService.add(definition);
accessFilter.setExcludeAuthenticateUrl(auth.get("excludeAuthenticateUrl").toString());
accessFilter.setExcludeAuthorizedUrl(auth.get("excludeAuthorizedUrl").toString());
} catch (Exception e) {
log.error("初始化网关路由时发生错误",e);
}
dynamicRouteByNacosListener(GatewayConfig.NACOS_ROUTE_DATA_ID,GatewayConfig.NACOS_ROUTE_GROUP);
}
/**
* 监听Nacos下发的动态路由配置
* @param dataId
* @param group
*/
public void dynamicRouteByNacosListener (String dataId, String group){
try {
configService.addListener(dataId, group, new Listener() {
@Override
public void receiveConfigInfo(String configInfo) {
log.info("进行网关更新:\n\r{}",configInfo);
if(StringUtil.isEmpty(configInfo)){
return;
}
Yaml yaml=new Yaml();
Map<String,Object> map= (Map<String, Object>) yaml.load(configInfo);
JSONObject jsonObject=new JSONObject(map);
JSONObject auth=jsonObject.getJSONObject("auth");
accessFilter.setExcludeAuthenticateUrl(auth.get("excludeAuthenticateUrl").toString());
accessFilter.setExcludeAuthorizedUrl(auth.get("excludeAuthorizedUrl").toString());
RouteDefinition definition=new RouteDefinition();
URI routerUri =URI.create (uri);
definition.setUri(routerUri);
definition.setId(id);
definition.setOrder(order);
PredicateDefinition predicateDefinition=new PredicateDefinition(predicates);
List<PredicateDefinition> predicateDefinitionList=new ArrayList<>();
predicateDefinitionList.add(predicateDefinition);
definition.setPredicates(predicateDefinitionList);
dynamicRouteService.update(definition);
}
@Override
public Executor getExecutor() {
log.info("getExecutor\n\r");
return null;
}
});
} catch (NacosException e) {
log.error("从nacos接收动态路由配置出错!!!",e);
}
}
/**
* 初始化网关路由 nacos config
* @return
*/
private ConfigService initConfigService(){
try{
Properties properties = new Properties();
properties.setProperty("serverAddr", GatewayConfig.NACOS_SERVER_ADDR);
properties.setProperty("namespace",GatewayConfig.NACOS_NAMESPACE);
return configService= NacosFactory.createConfigService(properties);
} catch (Exception e) {
log.error("初始化网关路由时发生错误",e);
return null;
}
}
}
3、nacos路由配置
package com.exp.gateway.route;
import com.alibaba.cloud.nacos.NacosConfigProperties;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.egrid.core.util.StringUtil;
import com.exp.gateway.config.DynamicRouteConfig.GatewayDynamicProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.gateway.event.RefreshRoutesEvent;
import org.springframework.cloud.gateway.filter.FilterDefinition;
import org.springframework.cloud.gateway.route.RouteDefinition;
import org.springframework.cloud.gateway.route.RouteDefinitionRepository;
import org.springframework.context.ApplicationEventPublisher;
import org.yaml.snakeyaml.Yaml;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.net.URISyntaxException;
import java.util.*;
import java.util.concurrent.Executor;
public class NacosRouteDefinitionRepository implements RouteDefinitionRepository {
private final Map<String, RouteDefinition> routes = Collections.synchronizedMap(new LinkedHashMap());
private static final Logger LOGGER = LoggerFactory.getLogger(NacosRouteDefinitionRepository.class);
private static final String JSON = "json";
private static final String YAML = "yaml";
private static final String YML = "yml";
private GatewayDynamicProperties properties;
private ApplicationEventPublisher publisher;
private NacosConfigProperties nacosConfigProperties;
public NacosRouteDefinitionRepository(GatewayDynamicProperties properties, ApplicationEventPublisher publisher, NacosConfigProperties nacosConfigProperties) {
this.publisher = publisher;
this.nacosConfigProperties = nacosConfigProperties;
this.properties = properties;
addListener();
}
@Override
public Flux<RouteDefinition> getRouteDefinitions() {
try {
if (properties.isOpen) {
String content = nacosConfigProperties.configServiceInstance().getConfig(properties.ROUTE_DATA_ID, properties.ROUTE_GROUP_ID,5000);
List<RouteDefinition> routeDefinitions = getListByStr(content);
return Flux.fromIterable(routeDefinitions);
}
} catch (Exception e) {
LOGGER.error("getRouteDefinitions by nacos error", e);
}
return Flux.fromIterable(new ArrayList<>());
}
/**
* 添加Nacos监听
*/
private void addListener() {
try {
nacosConfigProperties.configServiceInstance().addListener(properties.ROUTE_DATA_ID, properties.ROUTE_GROUP_ID, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configInfo) {
publisher.publishEvent(new RefreshRoutesEvent(this));
}
});
} catch (NacosException e) {
LOGGER.error("nacos-addListener-error", e);
}
}
@Override
public Mono<Void> save(Mono<RouteDefinition> route) {
return route.flatMap((r) -> {
this.routes.put(r.getId(), r);
return Mono.empty();
});
}
@Override
public Mono<Void> delete(Mono<String> routeId) {
return null;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private List<RouteDefinition> getListByStr(String content) throws URISyntaxException {
if (properties.ROUTE_FILE_EXTENSION.equalsIgnoreCase(JSON)) {
if (!StringUtil.isEmpty(content)) {
return JSONObject.parseArray(content, RouteDefinition.class);
}
} else if (properties.ROUTE_FILE_EXTENSION.equalsIgnoreCase(YAML) || properties.ROUTE_FILE_EXTENSION.equalsIgnoreCase(YML)) {
if (!StringUtil.isEmpty(content)) {
List<RouteDefinition> listRoute = new ArrayList<>();
Yaml yaml = new Yaml();
Map map = yaml.load(content);
List<Map> tmpRoutes = (List<Map>)((Map)((Map)((Map)map.get("spring")).get("cloud")).get("gateway")).get("routes");
for(Map tmpRoute : tmpRoutes) {
StringBuffer strRoute = new StringBuffer();
if (tmpRoute.containsKey("id")) {
strRoute.append(tmpRoute.get("id")).append("=");
} else {
strRoute.append(UUID.randomUUID().toString()).append("=");
}
strRoute.append(tmpRoute.get("uri")).append(",");
List<String> tmpPredicates = (List<String>)tmpRoute.get("predicates");
for (String tmpPredicate : tmpPredicates) {
strRoute.append(tmpPredicate).append(",");
}
RouteDefinition route = new RouteDefinition(strRoute.toString().substring(0, strRoute.length() - 1));
List<String> tmpFilters = (List<String>)tmpRoute.get("filters");
if (tmpFilters != null && tmpFilters.size() > 0) {
List<FilterDefinition> filters = new ArrayList<>();
for (String tmpFilter : tmpFilters) {
filters.add(new FilterDefinition(tmpFilter));
}
route.setFilters(filters);
}
listRoute.add(route);
}
return listRoute;
}
}
return new ArrayList<>(0);
}
}
4、过滤器
package com.exp.gateway.filter;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.egrid.core.util.ScheduledExecutor;
import com.exp.gateway.config.GatewayConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.StringUtils;
import org.springframework.web.server.ServerWebExchange;
import com.egrid.cache.jedis.cache.RedisCache;
import com.egrid.core.threadlocal.pool.TtlExecutors;
import com.egrid.core.util.JsonBeanUtil;
import com.egrid.core.util.StringUtil;
import com.egrid.core.web.response.RestResponse;
import com.egrid.core.web.response.RestStatusCode;
import com.exp.gateway.consumer.AuthConsumer;
import com.exp.gateway.consumer.BehaviorConsumer;
import com.exp.gateway.interceptor.FeignInterceptor;
import com.netflix.client.ClientFactory;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.IClientConfig;
import com.netflix.client.config.IClientConfigKey;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.loadbalancer.RandomRule;
import com.netflix.loadbalancer.ZoneAwareLoadBalancer;
import feign.Feign;
import feign.form.spring.SpringFormEncoder;
import feign.jackson.JacksonDecoder;
import feign.jackson.JacksonEncoder;
import feign.ribbon.LBClient;
import feign.ribbon.LBClientFactory;
import feign.ribbon.RibbonClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class AccessFilter implements GlobalFilter, Ordered {
private static final Logger LOGGER = LoggerFactory.getLogger(AccessFilter.class);
@Autowired
private RedisCache shiroRedisCache;
@Autowired
private DiscoveryClient discoveryClient;
@Value("${behavior.service}")
private String behaviorService;
@Value("${auth.service:service-auth}")
private String authService;
@Value("${auth.headers:cmf-auth-token}")
private String saveToHeader;
@Value("${auth.userKey:x-auth-user}")
private String userKey;
@Value("${auth.filterUrlStorage:authorizedUrl}")
private String filterUrlStorage;
@Value("${auth.excludeAuthenticateUrl}")
private String excludeAuthenticateUrl;
@Value("${auth.excludeAuthorizedUrl}")
private String excludeAuthorizedUrl;
@Override
public int getOrder() {
return 0;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
Map<String, Object> user = null;
String authorizedUrl = getPathWithinApplication(exchange);
LOGGER.info(String.format("%s AccessFilter request to %s, authorized url is %s",
exchange.getRequest().getMethod(), exchange.getRequest().getURI(), authorizedUrl));
try {
if (!shouldAuthenticateFilter(authorizedUrl)) {
String token = exchange.getRequest().getHeaders().getFirst(saveToHeader);
String strUser = (String)shiroRedisCache.get(userKey + ":" + token);
if (StringUtil.isEmpty(strUser)) {
return chain.filter(exchange);
} else {
try {
user = JsonBeanUtil.jsonToBean(strUser, HashMap.class);
user.put("realIp", getRequestIp(exchange.getRequest()));
ServerHttpRequest request = addHeader(exchange.getRequest(), URLEncoder.encode(JsonBeanUtil.beanToJson(user), "UTF-8"));
return chain.filter(exchange.mutate().request(request).build());
} catch (UnsupportedEncodingException e) {
return badResponse(exchange, HttpStatus.BAD_REQUEST, RestStatusCode.SERVER_UNKNOWN_ERROR,
new Exception(RestStatusCode.SERVER_UNKNOWN_ERROR.message()));
} finally {
shiroRedisCache.expire(userKey + ":" + token, TimeUnit.SECONDS, shiroRedisCache.getExpireSeconds());
shiroRedisCache.expire(token, TimeUnit.SECONDS, shiroRedisCache.getExpireSeconds());
}
}
} else if (!shouldAuthorizedFilter(authorizedUrl)) {
String token = exchange.getRequest().getHeaders().getFirst(saveToHeader);
String strUser = (String)shiroRedisCache.get(userKey + ":" + token);
if (StringUtil.isEmpty(strUser)) {
//免鉴权,表示还是需要认证的,如果缓存里没有,表示需要用户再次登录
return badResponse(exchange, HttpStatus.UNAUTHORIZED, RestStatusCode.SESSION_TIMEOUT,
new Exception("登录已失效,请重新登录"));
} else {
try {
user = JsonBeanUtil.jsonToBean(strUser, HashMap.class);
user.put("realIp", getRequestIp(exchange.getRequest()));
ServerHttpRequest request = addHeader(exchange.getRequest(), URLEncoder.encode(JsonBeanUtil.beanToJson(user), "UTF-8"));
return chain.filter(exchange.mutate().request(request).build());
} catch (UnsupportedEncodingException e) {
return badResponse(exchange, HttpStatus.BAD_REQUEST, RestStatusCode.SERVER_UNKNOWN_ERROR,
new Exception(RestStatusCode.SERVER_UNKNOWN_ERROR.message()));
} finally {
shiroRedisCache.expire(userKey + ":" + token, TimeUnit.SECONDS, shiroRedisCache.getExpireSeconds());
shiroRedisCache.expire(token, TimeUnit.SECONDS, shiroRedisCache.getExpireSeconds());
}
}
} else {
/** 获取service-auth服务在注册中心注册的地址 */
StringBuffer listOfServers = new StringBuffer();
List<ServiceInstance> serviceInstances = discoveryClient.getInstances(authService);
for (ServiceInstance instance : serviceInstances) {
listOfServers.append(instance.getHost()).append(":").append(instance.getPort()).append(",");
}
/** 通过Feign调用远程服务,获取认证信息 */
RibbonClient client = RibbonClient.builder().lbClientFactory(new LBClientFactory() {
@Override
public LBClient create(String clientName) {
IClientConfig config = ClientFactory.getNamedConfig(clientName);
IClientConfigKey<String> key = CommonClientConfigKey.ListOfServers;
config.set(key, listOfServers.substring(0, listOfServers.length() - 1));
ILoadBalancer lb = ClientFactory.getNamedLoadBalancer(clientName);
@SuppressWarnings("rawtypes")
ZoneAwareLoadBalancer zb = (ZoneAwareLoadBalancer) lb;
zb.setRule(new RandomRule());
return LBClient.create(lb, config);
}
}).build();
/**
* Request里已经存在的Header内容
*/
String[] arySaveToHeader;
if (StringUtil.isEmpty(saveToHeader)) {
arySaveToHeader = new String[] {};
} else {
if (saveToHeader.indexOf(",") > 0) {
arySaveToHeader = saveToHeader.split(",");
} else {
arySaveToHeader = saveToHeader.split(" ");
}
for (int i = 0; i < arySaveToHeader.length; i++) {
arySaveToHeader[i] = arySaveToHeader[i].trim();
}
}
/**
* 追加的Header内容
*/
Map<String, String> addHeader = new HashMap<>();
addHeader.put(filterUrlStorage, authorizedUrl);
AuthConsumer service = Feign.builder().requestInterceptor(new FeignInterceptor(exchange.getRequest(), arySaveToHeader, addHeader))
.client(client).encoder(new JacksonEncoder()).decoder(new JacksonDecoder())
.target(AuthConsumer.class, "http://" + authService);
RestResponse<Map<String, Object>> restResponse = service.isPermitted();
if (restResponse == null) {
return badResponse(exchange, HttpStatus.BAD_REQUEST, RestStatusCode.SERVER_UNKNOWN_ERROR,
new Exception(RestStatusCode.SERVER_UNKNOWN_ERROR.message()));
} else {
LOGGER.info(String.format("待鉴权的URL是:%s,该URL鉴权结果是:%s", authorizedUrl, restResponse.getCode()));
if (restResponse.getCode() == RestStatusCode.OK.code()) {
try {
user = restResponse.getResult();
user.put("realIp", getRequestIp(exchange.getRequest()));
ServerHttpRequest request = addHeader(exchange.getRequest(), URLEncoder.encode(JsonBeanUtil.beanToJson(user), "UTF-8"));
return chain.filter(exchange.mutate().request(request).build());
} catch (UnsupportedEncodingException e) {
return badResponse(exchange, HttpStatus.BAD_REQUEST, RestStatusCode.SERVER_UNKNOWN_ERROR,
new Exception(RestStatusCode.SERVER_UNKNOWN_ERROR.message()));
}
}
if (restResponse.getCode() == RestStatusCode.UNAUTHENTICED.code()) {
return badResponse(exchange, HttpStatus.UNAUTHORIZED, RestStatusCode.UNAUTHENTICED,
new Exception("尚未登录,请登录"));
}
if (restResponse.getCode() == RestStatusCode.SESSION_TIMEOUT.code()) {
return badResponse(exchange, HttpStatus.UNAUTHORIZED, RestStatusCode.SESSION_TIMEOUT,
new Exception("登录已失效,请重新登录"));
}
if (restResponse.getCode() == RestStatusCode.UNAUTHORIZED.code()) {
return badResponse(exchange, HttpStatus.UNAUTHORIZED, RestStatusCode.UNAUTHORIZED,
new Exception("没有权限操作该功能,请确认"));
}
if (restResponse.getCode() == RestStatusCode.SERVER_UNKNOWN_ERROR.code()) {
return badResponse(exchange, HttpStatus.BAD_REQUEST, RestStatusCode.SERVER_UNKNOWN_ERROR,
new Exception(RestStatusCode.SERVER_UNKNOWN_ERROR.message()));
} else {
return badResponse(exchange, HttpStatus.BAD_REQUEST, RestStatusCode.SERVER_UNKNOWN_ERROR,
new Exception(RestStatusCode.SERVER_UNKNOWN_ERROR.message()));
}
}
}
} finally {
if (!StringUtil.isEmpty(behaviorService)) {
if (user != null && user.containsKey("factory") && user.containsKey("brand") && user.containsKey("partyId")) {
String queryParams = null;
String bodyParams = null;
try {
if (HttpMethod.GET.name().equalsIgnoreCase(exchange.getRequest().getMethodValue())) {
queryParams = exchange.getRequest().getQueryParams().toString();
bodyParams = "";
} else if (HttpMethod.POST.name().equalsIgnoreCase(exchange.getRequest().getMethodValue())
&& exchange.getRequest().getHeaders().getContentType() != null
&& !exchange.getRequest().getHeaders().getContentType().equals(MediaType.MULTIPART_FORM_DATA)) {
queryParams = exchange.getRequest().getQueryParams().toString();
bodyParams = resolveBodyFromRequest(exchange.getRequest());
} else {
queryParams = "";
bodyParams = "";
}
} catch (Exception ex) {
queryParams = "";
bodyParams = "";
}
ScheduledExecutor.getScheduledExecutor().execute(new ThreadBehavior(discoveryClient, behaviorService, (String)user.get("factory"),
(String)user.get("brand"), (String)user.get("partyId"), authorizedUrl, queryParams, bodyParams));
}
}
}
}
static class ThreadBehavior implements Runnable{
private DiscoveryClient discoveryClient;
private String behaviorService;
private String factoryId;
private String brandId;
private String userId;
private String behaviorTargetUrl;
private String queryParams;
private String bodyParams;
ThreadBehavior(DiscoveryClient discoveryClient, String behaviorService, String factoryId, String brandId, String userId, String behaviorTargetUrl, String queryParams, String bodyParams){
this.discoveryClient = discoveryClient;
this.behaviorService = behaviorService;
this.factoryId = factoryId;
this.brandId = brandId;
this.userId = userId;
this.behaviorTargetUrl = behaviorTargetUrl;
this.queryParams = queryParams;
this.bodyParams = bodyParams;
}
@Override
public void run() {
// 用户行为记录
try {
StringBuffer listOfServers = new StringBuffer();
List<ServiceInstance> serviceInstances = discoveryClient.getInstances(behaviorService);
for (ServiceInstance instance : serviceInstances) {
listOfServers.append(instance.getHost()).append(":").append(instance.getPort()).append(",");
}
/** 通过Feign调用远程服务,获取认证信息 */
RibbonClient client = RibbonClient.builder().lbClientFactory(new LBClientFactory() {
@Override
public LBClient create(String clientName) {
IClientConfig config = ClientFactory.getNamedConfig(clientName);
IClientConfigKey<String> key = CommonClientConfigKey.ListOfServers;
config.set(key, listOfServers.substring(0, listOfServers.length() - 1));
ILoadBalancer lb = ClientFactory.getNamedLoadBalancer(clientName);
@SuppressWarnings("rawtypes")
ZoneAwareLoadBalancer zb = (ZoneAwareLoadBalancer) lb;
zb.setRule(new RandomRule());
return LBClient.create(lb, config);
}
}).build();
BehaviorConsumer service = Feign.builder()
.client(client).encoder(new SpringFormEncoder()).decoder(new JacksonDecoder())
.target(BehaviorConsumer.class, "http://" + behaviorService);
Map<String, Object> params = new HashMap<>();
params.put("factoryId", factoryId);
params.put("brandId", brandId);
params.put("partyId", Long.parseLong(userId));
params.put("behaviorType", "01");
params.put("behaviorTarget", "system");
params.put("behaviorTargetUrl", behaviorTargetUrl);
params.put("queryParams", queryParams);
params.put("bodyParams", bodyParams);
service.record(params);
} catch (Exception ex) {
LOGGER.error("用户行为记录出现异常,异常情况:{}" , ex);
}
}
}
private Mono<Void> badResponse (ServerWebExchange exchange, HttpStatus status, RestStatusCode restStatusCode, Exception exception) {
ServerHttpResponse response = exchange.getResponse();
RestResponse<String> restResponseCtx = new RestResponse<>(restStatusCode, exception);
byte[] bits = JsonBeanUtil.beanToJson(restResponseCtx).getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = response.bufferFactory().wrap(bits);
response.setStatusCode(status);
//指定编码,否则在浏览器中会中文乱码
response.getHeaders().add("Content-Type", "text/plain;charset=UTF-8");
return response.writeWith(Mono.just(buffer));
}
/*
* <p>*Description:增加header内容*</p>**
* @param ctx 上下文*
* @param request 请求
*/
private ServerHttpRequest addHeader(ServerHttpRequest request, String user) {
return request.mutate().header(userKey, new String[]{encodeHeadInfo(user)}).build();
}
private String encodeHeadInfo( String headInfo ) {
StringBuffer stringBuffer = new StringBuffer();
for (int i = 0, length = headInfo.length(); i < length; i++) {
char c = headInfo.charAt(i);
if (c <= '\u001f' || c >= '\u007f') {
stringBuffer.append( String.format ("\\u%04x", (int)c) );
} else {
stringBuffer.append(c);
}
}
return stringBuffer.toString();
}
/**
* <p>
* Field pathMatcher: 匹配工具类
* </p>
*/
private final AntPathMatcher pathMatcher = new AntPathMatcher();
private boolean shouldAuthenticateFilter(String authorizedUrl) {
if (StringUtil.isEmpty(excludeAuthenticateUrl)) {
return true;
} else {
String[] aryExcludeUrl;
if (excludeAuthenticateUrl.indexOf(",") > 0) {
aryExcludeUrl = excludeAuthenticateUrl.split(",");
} else {
aryExcludeUrl = excludeAuthenticateUrl.split(" ");
}
for (int i = 0; i < aryExcludeUrl.length; i++) {
//如果匹配到url则不再过滤
if (pathMatcher.match(aryExcludeUrl[i].trim(), authorizedUrl)) {
return false;
}
}
return true;// 是否执行该过滤器,此处为true,说明需要过滤
}
}
private boolean shouldAuthorizedFilter(String authorizedUrl) {
if (StringUtil.isEmpty(excludeAuthorizedUrl)) {
return true;
} else {
String[] aryExcludeUrl;
if (excludeAuthorizedUrl.indexOf(",") > 0) {
aryExcludeUrl = excludeAuthorizedUrl.split(",");
} else {
aryExcludeUrl = excludeAuthorizedUrl.split(" ");
}
for (int i = 0; i < aryExcludeUrl.length; i++) {
//如果匹配到url则不再过滤
if (pathMatcher.match(aryExcludeUrl[i].trim(), authorizedUrl)) {
return false;
}
}
return true;// 是否执行该过滤器,此处为true,说明需要过滤
}
}
/**
* <p>
* Description: 获取请求ip 暂时用不到
* </p>
*
* @param request 请求
* @return
*/
private String getRequestIp(ServerHttpRequest request) {
String ip = request.getHeaders().getFirst("X-Forwarded-For");
if (!StringUtils.isEmpty(ip) && !"unKnown".equalsIgnoreCase(ip)) {
//多次反向代理后会有多个ip值,第一个ip才是真实ip
int index = ip.indexOf(",");
if (index != -1) {
return ip.substring(0, index);
} else {
return ip;
}
}
ip = request.getHeaders().getFirst("X-Real-IP");
if (!StringUtils.isEmpty(ip) && !"unKnown".equalsIgnoreCase(ip)) {
return ip;
}
String uri = request.getURI().toString();
return uri.substring(0, uri.indexOf(request.getPath().toString()));
}
private static String getPathWithinApplication(ServerWebExchange exchange) {
String path = normalize(exchange.getRequest().getPath().toString());
return (StringUtils.hasText(path) ? path : "/");
}
private static String normalize(String path) {
return normalize(path, true);
}
/**
* Normalize a relative URI path that may have relative values ("/./",
* "/../", and so on ) it it. <strong>WARNING</strong> - This method is
* useful only for normalizing application-generated paths. It does not
* try to perform security checks for malicious input.
* Normalize operations were was happily taken from org.apache.catalina.util.RequestUtil in
* Tomcat trunk, r939305
*
* @param path Relative path to be normalized
* @param replaceBackSlash Should '\\' be replaced with '/'
* @return normalized path
*/
private static String normalize(String path, boolean replaceBackSlash) {
if (path == null)
return null;
// Create a place for the normalized path
String normalized = path;
if (replaceBackSlash && normalized.indexOf('\\') >= 0)
normalized = normalized.replace('\\', '/');
if (normalized.equals("/."))
return "/";
// Add a leading "/" if necessary
if (!normalized.startsWith("/"))
normalized = "/" + normalized;
// Resolve occurrences of "//" in the normalized path
while (true) {
int index = normalized.indexOf("//");
if (index < 0)
break;
normalized = normalized.substring(0, index) +
normalized.substring(index + 1);
}
// Resolve occurrences of "/./" in the normalized path
while (true) {
int index = normalized.indexOf("/./");
if (index < 0)
break;
normalized = normalized.substring(0, index) +
normalized.substring(index + 2);
}
// Resolve occurrences of "/../" in the normalized path
while (true) {
int index = normalized.indexOf("/../");
if (index < 0)
break;
if (index == 0)
return (null); // Trying to go outside our context
int index2 = normalized.lastIndexOf('/', index - 1);
normalized = normalized.substring(0, index2) +
normalized.substring(index + 3);
}
// Return the normalized path that we have completed
return (normalized);
}
private String resolveBodyFromRequest(ServerHttpRequest serverHttpRequest) {
//获取请求体
Flux<DataBuffer> body = serverHttpRequest.getBody();
StringBuilder sb = new StringBuilder();
body.subscribe(buffer -> {
byte[] bytes = new byte[buffer.readableByteCount()];
buffer.read(bytes);
sb.append(new String(bytes, StandardCharsets.UTF_8));
// 创建ByteBuf对象后,它的引用计数是1,当你每次调用DataBufferUtils.release之后会释放引用计数对象时,它的引用计数减1,如果引用计数为0,这个引用计数对象会被释放(deallocate),并返回对象池。
// 使用相同的方法来获取body数据,超过2次就会提示refCnt: 0,如果想避免该问题可以不进行release(注释DataBufferUtils.release(buffer))
DataBufferUtils.release(buffer);
});
return sb.toString();
}
static class ScheduledExecutor {
private static class LazyHolder {
private static Thread _ShutdownThread;
static ScheduledExecutorService _ScheduledExecutor = null;
static {
_ScheduledExecutor = TtlExecutors.getTtlScheduledExecutorService(
new ScheduledThreadPoolExecutor(20, new CmfaThreadFactory("GatewayExecutor")));
_ShutdownThread = new Thread(new Runnable() {
public void run() {
shutdownExecutorPool();
}
});
Runtime.getRuntime().addShutdownHook(_ShutdownThread);
}
private static void shutdownExecutorPool() {
if (_ScheduledExecutor != null) {
_ScheduledExecutor.shutdown();
if (_ShutdownThread != null) {
try {
Runtime.getRuntime().removeShutdownHook(_ShutdownThread);
} catch (IllegalStateException ise) {
}
}
}
}
}
static ScheduledExecutorService getScheduledExecutor() {
return LazyHolder._ScheduledExecutor;
}
}
static class CmfaThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
CmfaThreadFactory(String threadName) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "BehaviorPool-" + poolNumber.getAndIncrement() + "-" + threadName + "-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
public String getExcludeAuthenticateUrl() {
return excludeAuthenticateUrl;
}
public void setExcludeAuthenticateUrl(String excludeAuthenticateUrl) {
this.excludeAuthenticateUrl = excludeAuthenticateUrl;
}
public String getExcludeAuthorizedUrl() {
return excludeAuthorizedUrl;
}
public void setExcludeAuthorizedUrl(String excludeAuthorizedUrl) {
this.excludeAuthorizedUrl = excludeAuthorizedUrl;
}
}
以上对应的nacos配置没有加
上一篇: 前后端联调出现的错误