springboot+zuul实现自定义过滤器、动态路由、动态负载。
参考:
zuul是netflix开源的微服务网关,他的核心是一系列的过滤器,通过这些过滤器我们可以轻松的实现服务的访问认证、限流、路由、负载、熔断等功能。
基于对已有项目代码零侵入的需求,本文没有将zuul网关项目注册到eureka中心,而是将zuul与springboot结合作为一个独立的项目进行请求转发,因此本项目是非spring cloud架构。
开始编写zuul网关项目
首先,新建一个spring boot项目。加入zuul依赖,开启@enablezuulproxy注解。
pom.xml
1 <dependency> 2 <groupid>org.springframework.cloud</groupid> 3 <artifactid>spring-cloud-starter-zuul</artifactid> 4 <version>1.4.4.release</version> 5 </dependency>
application.properties
1 server.port=8090 2 eureka.client.enable=false 3 zuul.ribbon.eager-load.enabled=true 4 5 zuul.senderrorfilter.post.disable=true
由于后续会使用到动态路由,所以这里我们并不需要在application.properties中做网关地址转发映射。
springbootzuulapplication.java
1 package com.syher.zuul; 2 3 import com.google.common.util.concurrent.threadfactorybuilder; 4 import com.syher.zuul.core.zuul.router.propertiesrouter; 5 import org.springframework.beans.factory.annotation.autowired; 6 import org.springframework.boot.commandlinerunner; 7 import org.springframework.boot.springapplication; 8 import org.springframework.boot.autoconfigure.enableautoconfiguration; 9 import org.springframework.cloud.netflix.zuul.enablezuulproxy; 10 import org.springframework.cloud.netflix.zuul.routesrefreshedevent; 11 import org.springframework.cloud.netflix.zuul.filters.routelocator; 12 import org.springframework.context.applicationeventpublisher; 13 import org.springframework.context.annotation.componentscan; 14 15 import java.io.file; 16 import java.util.concurrent.executors; 17 import java.util.concurrent.scheduledexecutorservice; 18 import java.util.concurrent.timeunit; 19 20 /** 21 * @author braska 22 * @date 2018/06/25. 23 **/ 24 @enableautoconfiguration 25 @enablezuulproxy 26 @componentscan(basepackages = { 27 "com.syher.zuul.core", 28 "com.syher.zuul.service" 29 }) 30 public class springbootzuulapplication implements commandlinerunner { 31 @autowired 32 applicationeventpublisher publisher; 33 @autowired 34 routelocator routelocator; 35 36 private scheduledexecutorservice executor; 37 private long lastmodified = 0l; 38 private boolean instance = true; 39 40 public static void main(string[] args) { 41 springapplication.run(springbootzuulapplication.class, args); 42 } 43 44 @override 45 public void run(string... args) throws exception { 46 executor = executors.newsinglethreadscheduledexecutor( 47 new threadfactorybuilder().setnameformat("properties read.").build() 48 ); 49 executor.schedulewithfixeddelay(() -> publish(), 0, 1, timeunit.seconds); 50 } 51 52 private void publish() { 53 if (ispropertiesmodified()) { 54 publisher.publishevent(new routesrefreshedevent(routelocator)); 55 } 56 } 57 58 private boolean ispropertiesmodified() { 59 file file = new file(this.getclass().getclassloader().getresource(propertiesrouter.properties_file).getpath()); 60 if (instance) { 61 instance = false; 62 return false; 63 } 64 if (file.lastmodified() > lastmodified) { 65 lastmodified = file.lastmodified(); 66 return true; 67 } 68 return false; 69 } 70 }
一、自定义过滤器
自定义zuul过滤器比较简单。我们先讲过滤器。
zuul过滤器分为pre、route、post、error四种类型。作用我就不详细讲了,网上资料一大把。本文主要写路由前的过滤,即pre类型。
要自定义一个过滤器,只需要要继承zuulfilter,然后指定过滤类型、过滤顺序、是否执行这个过滤器、过滤内容就ok了。
为了便于扩展,这里用到了模板模式。
abstractzuulfilter.java
1 package com.syher.zuul.core.zuul.filter; 2 3 import com.netflix.zuul.zuulfilter; 4 import com.netflix.zuul.context.requestcontext; 5 import com.syher.zuul.core.zuul.contantvalue; 6 7 /** 8 * @author braska 9 * @date 2018/06/29. 10 **/ 11 public abstract class abstractzuulfilter extends zuulfilter { 12 13 protected requestcontext context; 14 15 @override 16 public boolean shouldfilter() { 17 requestcontext ctx = requestcontext.getcurrentcontext(); 18 return (boolean) (ctx.getordefault(contantvalue.next_filter, true)); 19 } 20 21 @override 22 public object run() { 23 context = requestcontext.getcurrentcontext(); 24 return dorun(); 25 } 26 27 public abstract object dorun(); 28 29 public object fail(integer code, string message) { 30 context.set(contantvalue.next_filter, false); 31 context.setsendzuulresponse(false); 32 context.getresponse().setcontenttype("text/html;charset=utf-8"); 33 context.setresponsestatuscode(code); 34 context.setresponsebody(string.format("{\"result\":\"%s!\"}", message)); 35 return null; 36 } 37 38 public object success() { 39 context.set(contantvalue.next_filter, true); 40 return null; 41 } 42 }
定义prefilter的抽象类,继承abstractzuulfilter。指定pre类型,之后所有的pre过滤器都可以继承这个抽象类。
abstractprezuulfilter.java
1 package com.syher.zuul.core.zuul.filter.pre; 2 3 import com.syher.zuul.core.zuul.filtertype; 4 import com.syher.zuul.core.zuul.filter.abstractzuulfilter; 5 6 /** 7 * @author braska 8 * @date 2018/06/29. 9 **/ 10 public abstract class abstractprezuulfilter extends abstractzuulfilter { 11 @override 12 public string filtertype() { 13 return filtertype.pre.name(); 14 } 15 }
接着编写具体一个具体的过滤器,比如限流。
ratelimiterfilter.java
1 package com.syher.zuul.core.zuul.filter.pre; 2 3 import com.google.common.util.concurrent.ratelimiter; 4 import com.syher.zuul.core.zuul.filterorder; 5 import org.slf4j.logger; 6 import org.slf4j.loggerfactory; 7 8 import javax.servlet.http.httpservletrequest; 9 10 /** 11 * @author braska 12 * @date 2018/06/29. 13 **/ 14 public class ratelimiterfilter extends abstractprezuulfilter { 15 16 private static final logger logger = loggerfactory.getlogger(ratelimiterfilter.class); 17 18 /** 19 * 每秒允许处理的量是50 20 */ 21 ratelimiter ratelimiter = ratelimiter.create(50); 22 23 @override 24 public int filterorder() { 25 return filterorder.rate_limiter_order; 26 } 27 28 @override 29 public object dorun() { 30 httpservletrequest request = context.getrequest(); 31 string url = request.getrequesturi(); 32 if (ratelimiter.tryacquire()) { 33 return success(); 34 } else { 35 logger.info("rate limit:{}", url); 36 return fail(401, string.format("rate limit:{}", url)); 37 } 38 } 39 }
其他类型的过滤器也一样。创建不同的抽象类,比如abstractpostzuulfilter,指定filtertype,然后具体的postfilter只要继承该抽象类即可。
最后,将过滤器托管给spring。
zuulconfigure.java
1 package com.syher.zuul.core.config; 2 3 import com.netflix.loadbalancer.irule; 4 import com.netflix.zuul.zuulfilter; 5 import com.syher.zuul.core.ribbon.serverloadbalancerrule; 6 import com.syher.zuul.core.zuul.filter.pre.ratelimiterfilter; 7 import com.syher.zuul.core.zuul.filter.pre.tokenaccessfilter; 8 import com.syher.zuul.core.zuul.filter.pre.userrightfilter; 9 import com.syher.zuul.core.zuul.router.propertiesrouter; 10 import org.springframework.beans.factory.annotation.autowired; 11 import org.springframework.boot.autoconfigure.web.serverproperties; 12 import org.springframework.cloud.netflix.zuul.filters.zuulproperties; 13 import org.springframework.context.annotation.bean; 14 import org.springframework.context.annotation.configuration; 15 16 /** 17 * @author braska 18 * @date 2018/07/05. 19 **/ 20 @configuration 21 public class zuulconfigure { 22 23 @autowired 24 zuulproperties zuulproperties; 25 @autowired 26 serverproperties server; 27 28 /** 29 * 动态路由 30 * @return 31 */ 32 @bean 33 public propertiesrouter propertiesrouter() { 34 return new propertiesrouter(this.server.getservletprefix(), this.zuulproperties); 35 } 36 37 /** 38 * 动态负载 39 * @return 40 */ 41 @bean 42 public irule loadbalance() { 43 return new serverloadbalancerrule(); 44 } 45 46 /** 47 * 自定义过滤器 48 * @return 49 */ 50 @bean 51 public zuulfilter ratelimiterfilter() { 52 return new ratelimiterfilter(); 53 } 54 }
二、动态路由
接着写动态路由。动态路由需要配置可持久化且能动态刷新。
zuul默认使用的路由是simpleroutelocator,不具备动态刷新的效果。discoveryclientroutelocator具备刷新功能,但是需要已有的项目将服务注册到eureka,这不符合已有项目代码零侵入的需求所以排除。那么还有个办法就是自定义路由然后实现refreshableroutelocator类。
部分代码如下:
abstractdynamicrouter.java
1 package com.syher.zuul.core.zuul.router; 2 3 import com.syher.zuul.core.zuul.entity.basicroute; 4 import org.apache.commons.lang.stringutils; 5 import org.slf4j.logger; 6 import org.slf4j.loggerfactory; 7 import org.springframework.beans.beanutils; 8 import org.springframework.cloud.netflix.zuul.filters.refreshableroutelocator; 9 import org.springframework.cloud.netflix.zuul.filters.simpleroutelocator; 10 import org.springframework.cloud.netflix.zuul.filters.zuulproperties; 11 12 import java.util.linkedhashmap; 13 import java.util.list; 14 import java.util.map; 15 16 /** 17 * @author braska 18 * @date 2018/07/02. 19 **/ 20 public abstract class abstractdynamicrouter extends simpleroutelocator implements refreshableroutelocator { 21 22 private static final logger logger = loggerfactory.getlogger(abstractdynamicrouter.class); 23 24 public abstractdynamicrouter(string servletpath, zuulproperties properties) { 25 super(servletpath, properties); 26 } 27 28 @override 29 public void refresh() { 30 dorefresh(); 31 } 32 33 @override 34 protected map<string, zuulproperties.zuulroute> locateroutes() { 35 linkedhashmap<string, zuulproperties.zuulroute> routes = new linkedhashmap<string, zuulproperties.zuulroute>(); 36 routes.putall(super.locateroutes()); 37 38 list<basicroute> results = readroutes(); 39 40 for (basicroute result : results) { 41 if (stringutils.isempty(result.getpath()) ) { 42 continue; 43 } 44 zuulproperties.zuulroute zuulroute = new zuulproperties.zuulroute(); 45 try { 46 beanutils.copyproperties(result, zuulroute); 47 } catch (exception e) { 48 logger.error("=============load zuul route info from db with error==============", e); 49 } 50 routes.put(zuulroute.getpath(), zuulroute); 51 } 52 return routes; 53 } 54 55 /** 56 * 读取路由信息 57 * @return 58 */ 59 protected abstract list<basicroute> readroutes(); 60 }
由于本人比较懒。不想每次写个demo都要重新配置一大堆数据库信息。所以本文很多数据比如路由信息、比如负载策略。要么写在文本里面,要么直接java代码构造。
本demo的路由信息就是从properties里面读取。嗯,继承abstractdynamicrouter即可。
propertiesrouter.java
1 package com.syher.zuul.core.zuul.router; 2 3 import com.google.common.collect.lists; 4 import com.google.common.util.concurrent.threadfactorybuilder; 5 import com.syher.zuul.common.context; 6 import com.syher.zuul.core.zuul.entity.basicroute; 7 import org.apache.commons.lang.stringutils; 8 import org.slf4j.logger; 9 import org.slf4j.loggerfactory; 10 import org.springframework.cloud.netflix.zuul.filters.zuulproperties; 11 12 import java.io.file; 13 import java.io.ioexception; 14 import java.util.hashmap; 15 import java.util.list; 16 import java.util.map; 17 import java.util.properties; 18 import java.util.concurrent.executors; 19 import java.util.concurrent.scheduledexecutorservice; 20 import java.util.stream.collectors; 21 22 /** 23 * @author braska 24 * @date 2018/07/02. 25 **/ 26 public class propertiesrouter extends abstractdynamicrouter { 27 28 private static final logger logger = loggerfactory.getlogger(propertiesrouter.class); 29 public static final string properties_file = "router.properties"; 30 private static final string zuul_router_prefix = "zuul.routes"; 31 32 33 public propertiesrouter(string servletpath, zuulproperties properties) { 34 super(servletpath, properties); 35 } 36 37 @override 38 protected list<basicroute> readroutes() { 39 list<basicroute> list = lists.newarraylistwithexpectedsize(3); 40 try { 41 properties prop = new properties(); 42 prop.load( 43 this.getclass().getclassloader().getresourceasstream(properties_file) 44 ); 45 46 context context = new context(new hashmap<>((map) prop)); 47 map<string, string> data = context.getsubproperties(zuul_router_prefix); 48 list<string> ids = data.keyset().stream().map(s -> s.substring(0, s.indexof("."))).distinct().collect(collectors.tolist()); 49 ids.stream().foreach(id -> { 50 map<string, string> router = context.getsubproperties(string.join(".", zuul_router_prefix, id)); 51 52 string path = router.get("path"); 53 path = path.startswith("/") ? path : "/" + path; 54 55 string serviceid = router.getordefault("serviceid", null); 56 string url = router.getordefault("url", null); 57 58 basicroute basicroute = new basicroute(); 59 basicroute.setid(id); 60 basicroute.setpath(path); 61 basicroute.seturl(router.getordefault("url", null)); 62 basicroute.setserviceid((stringutils.isblank(url) && stringutils.isblank(serviceid)) ? id : serviceid); 63 basicroute.setretryable(boolean.parseboolean(router.getordefault("retry-able", "false"))); 64 basicroute.setstripprefix(boolean.parseboolean(router.getordefault("strip-prefix", "false"))); 65 list.add(basicroute); 66 }); 67 } catch (ioexception e) { 68 logger.info("error to read " + properties_file + " :{}", e); 69 } 70 return list; 71 } 72 }
既然是动态路由实时刷新,那肯定需要一个定时器定时监控properties文件。所以我在启动类springbootzuulapplication加了个定时器监控properties是否发生过变更(之前有疑问的现在可以解惑了)。一旦文件被修改过就重新发布一下, 然后会触发routelocator的refresh方法。
1 public void publish() { 2 if (ispropertiesmodified()) { 3 publisher.publishevent(new routesrefreshedevent(routelocator)); 4 } 5 }
当然,如果是从数据库或者其他地方比如redis读取就不需要用到定时器,只要在增删改的时候直接publish就好了。
最后,记得propertiesrouter类交由spring托管(在zuulconfigure类中配置bean)。
router.properties文件:
1 zuul.routes.dashboard.path=/** 2 zuul.routes.dashboard.strip-prefix=true 3 4 ##不使用动态负载需指定url 5 ##zuul.routes.dashboard.url=http://localhost:9000/ 6 ##zuul服务部署后,动态增加网关映射,无需重启即可实时路由到新的网关 7 ##zuul.routes.baidu.path=/**
三、动态负载
负载也算比较简单,复杂点的是写负载算法。
动态负载主要分两个步骤:
1、根据网关项目配置的host和port去数据库(我是java直接造的数据)查找负载策略,比如轮询、比如随机、比如iphash等等。
2、根据策略结合每台服务器分配的权重选出合适的服务。
实现动态负载需要自定义rule类然后继承abstractloadbalancerrule类。
首先看负载策略的选择:
serverloadbalancerrule.java
1 package com.syher.zuul.core.ribbon; 2 3 import com.google.common.base.preconditions; 4 import com.netflix.client.config.iclientconfig; 5 import com.netflix.loadbalancer.abstractloadbalancerrule; 6 import com.netflix.loadbalancer.iloadbalancer; 7 import com.netflix.loadbalancer.server; 8 import com.syher.zuul.common.util.systemutil; 9 import com.syher.zuul.core.ribbon.balancer.loadbalancer; 10 import com.syher.zuul.core.ribbon.balancer.randomloadbalancer; 11 import com.syher.zuul.core.ribbon.balancer.roundloadbalancer; 12 import com.syher.zuul.entity.gatewayaddress; 13 import com.syher.zuul.service.gatewayservice; 14 import org.apache.commons.lang.stringutils; 15 import org.slf4j.logger; 16 import org.slf4j.loggerfactory; 17 import org.springframework.beans.factory.annotation.autowired; 18 import org.springframework.beans.factory.annotation.value; 19 20 /** 21 * @author braska 22 * @date 2018/07/05. 23 **/ 24 public class serverloadbalancerrule extends abstractloadbalancerrule { 25 26 private static final logger logger = loggerfactory.getlogger(serverloadbalancerrule.class); 27 28 @value("${server.host:127.0.0.1}") 29 private string host; 30 @value("${server.port:8080}") 31 private integer port; 32 33 @autowired 34 private gatewayservice gatewayservice; 35 36 @override 37 public void initwithniwsconfig(iclientconfig iclientconfig) { 38 } 39 40 @override 41 public server choose(object key) { 42 return getserver(getloadbalancer(), key); 43 } 44 45 private server getserver(iloadbalancer loadbalancer, object key) { 46 if (stringutils.isblank(host)) { 47 host = systemutil.iplist().get(0); 48 } 49 //preconditions.checkargument(host != null, "server.host must be specify."); 50 //preconditions.checkargument(port != null, "server.port must be specify."); 51 52 gatewayaddress address = gatewayservice.getbyhostandport(host, port); 53 if (address == null) { //这里的逻辑可以改,找不到网关配置信息可以指定默认的负载策略 54 logger.error(string.format("must be config a gateway info for the server[%s:%s].", host, string.valueof(port))); 55 return null; 56 } 57 58 loadbalancer balancer = loadbalancerfactory.build(address.getfkstrategyid()); 59 60 return balancer.chooseserver(loadbalancer); 61 } 62 63 static class loadbalancerfactory { 64 65 public static loadbalancer build(string strategy) { 66 gatewayaddress.strategytype type = gatewayaddress.strategytype.of(strategy); 67 switch (type) { 68 case round: 69 return new roundloadbalancer(); 70 case random: 71 return new randomloadbalancer(); 72 default: 73 return null; 74 } 75 } 76 } 77 }
然后是负载算法接口代码。
loadbalancer.java
1 package com.syher.zuul.core.ribbon.balancer; 2 3 import com.netflix.loadbalancer.iloadbalancer; 4 import com.netflix.loadbalancer.server; 5 6 /** 7 * @author braska 8 * @date 2018/07/06. 9 **/ 10 public interface loadbalancer { 11 12 /** 13 * choose a loadbalancer 14 * @param loadbalancer 15 * @return 16 */ 17 server chooseserver(iloadbalancer loadbalancer); 18 }
定义抽象类,实现loadbalancer接口
abstractloadbalancer.java
1 package com.syher.zuul.core.ribbon.balancer; 2 3 import com.netflix.loadbalancer.iloadbalancer; 4 import com.netflix.loadbalancer.server; 5 import com.syher.zuul.core.springcontext; 6 import com.syher.zuul.service.serverservice; 7 import org.slf4j.logger; 8 import org.slf4j.loggerfactory; 9 10 /** 11 * @author braska 12 * @date 2018/07/06. 13 **/ 14 public abstract class abstractloadbalancer implements loadbalancer { 15 private static final logger logger = loggerfactory.getlogger(abstractloadbalancer.class); 16 protected serverservice serverservice; 17 18 @override 19 public server chooseserver(iloadbalancer loadbalancer) { 20 this.serverservice = springcontext.getbean(serverservice.class); 21 server server = choose(loadbalancer); 22 if (server != null) { 23 logger.info(string.format("the server[%s:%s] has been select.", server.gethost(), server.getport())); 24 } else { 25 logger.error("could not find any server."); 26 } 27 return server; 28 } 29 30 public abstract server choose(iloadbalancer loadbalancer); 31 }
轮询负载算法
roundloadbalancer.java
1 package com.syher.zuul.core.ribbon.balancer; 2 3 import com.netflix.loadbalancer.iloadbalancer; 4 import com.netflix.loadbalancer.server; 5 import com.syher.zuul.common.constant; 6 import com.syher.zuul.core.globalcache; 7 import com.syher.zuul.core.ribbon.loadbalancerruleutil; 8 import com.syher.zuul.entity.serveraddress; 9 10 import java.util.list; 11 12 /** 13 * 权重轮询 14 * 首次使用取最大权重的服务器。而后通过权重的不断递减,寻找适合的服务器。 15 * @author braska 16 * @date 2018/07/06. 17 **/ 18 public class roundloadbalancer extends abstractloadbalancer { 19 20 private integer currentserver; 21 private integer currentweight; 22 private integer maxweight; 23 private integer gcdweight; 24 25 @override 26 public server choose(iloadbalancer loadbalancer) { 27 list<serveraddress> addresslist = serverservice.getavailableserver(); 28 if (addresslist != null && !addresslist.isempty()) { 29 maxweight = loadbalancerruleutil.getmaxweightforservers(addresslist); 30 gcdweight = loadbalancerruleutil.getgcdforservers(addresslist); 31 currentserver = integer.parseint(globalcache.instance().getordefault(constant.current_server_key, -1).tostring()); 32 currentweight = integer.parseint(globalcache.instance().getordefault(constant.current_weight_key, 0).tostring()); 33 34 integer servercount = addresslist.size(); 35 36 if (1 == servercount) { 37 return new server(addresslist.get(0).gethost(), addresslist.get(0).getport()); 38 } else { 39 while (true) { 40 currentserver = (currentserver + 1) % servercount; 41 if (currentserver == 0) { 42 currentweight = currentweight - gcdweight; 43 if (currentweight <= 0) { 44 currentweight = maxweight; 45 if (currentweight == 0) { 46 globalcache.instance().put(constant.current_server_key, currentserver); 47 globalcache.instance().put(constant.current_weight_key, currentweight); 48 thread.yield(); 49 return null; 50 } 51 } 52 } 53 54 serveraddress address = addresslist.get(currentserver); 55 if (address.getweight() >= currentweight) { 56 globalcache.instance().put(constant.current_server_key, currentserver); 57 globalcache.instance().put(constant.current_weight_key, currentweight); 58 return new server(address.gethost(), address.getport()); 59 } 60 } 61 } 62 63 } 64 return null; 65 } 66 }
最后,serverloadbalancerrule交由spring托管。
至此,springboot+zuul实现自定义过滤器、动态路由、动态负载就都完成了。
源码: