WebFlux服务端开发
程序员文章站
2022-04-30 20:00:36
...
一.SpringWebFlux基础知识
1.概念
- Spring5提出的新的开发Web的技术栈,非阻塞的开发模式,运行在netty或servlet3.1上,支持很高的并发量
- 非阻塞的概念
- WebFlux一个线程里可以处理更多的请求
- 老的开发模式:一个请求会对应容器里的一个线程
- 运行环境的不同
- 老的开发模式:基于ServletAPI,即运行在Servlet容器上面
- Webflux开发模式:基于响应式流,可以运行在Servlet3.1之后的容器(异步Servlet容器)或Netty上
- 数据库的不同
- 目前关系型数据库都是不支持响应式(基于JDBC)的数据库
- Reactive stack的SpringDataReactiveRepositories是:Mongo,Cassandra,Redis,Couchbase
2. 开发优势
- 支持高并发(异步非阻塞模式——垂直扩展)
【题外知识:扩展分为两部分水平扩展和垂直扩展,水平扩展指的是人员和硬件设备的增加,垂直扩展只的是技术栈的变更】
二.异步Servlet
1.为什么要使用异步Servlet?同步Servlet阻塞了什么?
- 同步Servlet代码
@WebServlet("/SyncServlet")
public class SyncServlet extends HttpServlet {
public SyncServlet() {
super();
}
protected void doGet(HttpServletRequest request, HttpServletResponse) {
//获取进入时时间
long t1 = System.currectTimeMillis();
//执行业务代码
doSomeThing(request, response);
//打印执行耗时
System.out.println("Sync use:"+(System.currectTimeMillis()-t1));
}
protected void doPost(HttpServletRequest request, HttpServletResponse) {
doGet(request, response);
}
//耗时操作执行逻辑
public void doSomeThing(HttpServletRequest request, HttpServletResponse) {
//模拟耗时操作
try {
TimeUnit.SECONDS.sleep(5);//延迟5秒
} catch(InterruptedException e) {}
response.getWriter().append("done");
}
}
//访问请求大约耗时5s
- 同步Servlet阻塞了Tomcat容器的servlet线程。
- 请求执行流程:当网络请求发送到Tomcat容器之后,Tomcat容器会给每一个请求创建一个线程去处理,线程里会调用指定Servlet去处理。当使用同步Servlet时,业务代码花多长时间,Servlet代码就要等多长时间。
- 异步Servlet代码[不会阻塞Tomcat的Servlet线程]
@WebServlet(asyncSupported=true, urlPatterns={"/AsyncServlet"})
public class AsyncServlet extends HttpServlet {
public AsyncServlet() {
super();
}
protected void doGet(HttpServletRequest request, HttpServletResponse) {
//获取进入时时间
long t1 = System.currectTimeMillis();
//开启异步,获得异步执行的上下文
AsyncContext asyncContext = request.startAsync();
//使用JDK8的CompletableFuture.runAsync(()->...);的方式执行异步操作在不同线程里处理[执行业务代码,传递异步执行的上下文请求及响应]
CompletableFuture.runAsync(()->doSomeThing(asyncContext,asyncContext.getRequest(), asyncContext.getResponse()));
//打印执行耗时
System.out.println("Async use:"+(System.currectTimeMillis()-t1));
}
protected void doPost(HttpServletRequest request, HttpServletResponse) {
doGet(request, response);
}
//耗时操作执行逻辑
public void doSomeThing(AsyncContext asyncContext,ServletRequest request, ServletResponse) {
//模拟耗时操作
try {
TimeUnit.SECONDS.sleep(5);//延迟5秒
} catch(InterruptedException e) {}
response.getWriter().append("done");
//业务代码处理完毕,通知结束
asyncContext.complete();
}
}
//访问请求大约耗时16ms
2.异步Servlet怎样编写
- 开启异步上下文
- 将异步代码放到独立的线程池中执行
- 调用异步上下文的complete()方法通知结束
三.WebFlux开发
1.Mono对象实战开发
-
添加Reactive Web依赖
-
实例代码
@RestController public class TestController { @GetMapping("/1") private String get1() { return "some string"; } @GetMapping("/2") private Mono<String> get2() { Mono<String>result = Mono.fromSupplier(()->"some string"); //返回Mono不会阻塞线程,如果是耗时操作会异步执行 return result; } }
-
Mono对象是Reactor里面的对象
- reactor=jdk8 stream+jdk9 reactive stream
- Mono 0-1个元素(序列)
- Flux 0-N个元素(序列)
-
实例演示
String[] strs = {"1","2","3"}; //定义订阅者 Subscriber<Integer> subscriber = new Subscriber<Integer>() { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { //保存订阅关系,需要用它来给发布者响应 this.subscription = subscription; //请求一个数据 this.subscription.request(1); } @Override public void onNext(Integer item) { //接受一个数据,处理 System.out.println("接收到数据:"+item); //处理完调用request再请求一个数据 this.subscription.request(1); //或者 已经达到了目标,调用cancel告诉发布者不再接受数据了 //this.subscription.cancel(); } @Override public void onError(Throwable throwable) { //出现了异常(例如处理数据的时候产生了异常) throwable.printStackTrace(); //我们可以告诉发布者,后面不接受数据了 this.subscription.cancel(); } @Override public void onComplete() { //全部数据处理完了(发布者关闭了) System.out.println("处理完了!"); } } //这里是jdk8的stream Flux.fromArray(strs).map(s->Integer.parseInt(s)) //最终操作是订阅,即jdk9的reactive stream .subscribe(subscriber); //输出结果: //接收到数据:1 //接收到数据:2 //接收到数据:3 //处理完了!
-
2.Flux对象实战开发
-
实例代码
@RestController public class TestController { /** * Flux:返回0-n个元素 * 返回的属性produces需要指定成流的形式 */ @GetMapping(value="/3", produces=MediaType.TEXT_EVENT_STREAM_VALUE) private Flux<String> flux() { Flux<String>result = Flux.fromStream(IntStream.range(1,5).mapToObj(i->{ try{ TimeUnit.SECONDS.sleep(1); }catch(InterruptedException e) {} return "flux data--"+i; })); return result; } } //页面输出: //flux data--1 //[隔1秒后页面继续打印] //flux data--2 //[隔1秒后页面继续打印] //flux data--3 //[隔1秒后页面继续打印] //flux data--4
四.SSE(Server-Sent Events)
http是一问一答的形式,可是Flux是如何做到一次请求页面一点一点输出的呢?通过的就是H5的SSE。
1.实例代码
- 服务端Servlet处理逻辑【向页面上间隔1秒输出信息】
protected void doGet(HttpServletRequest request, HttpServletResponse response) {
//设置SSE是必须要设置ContentType和CharacterEncoding
response.setContentType("text/event-stream");
response.setCharacterEncoding("utf-8");
for(int i=0;i<5;i++){
response.getWriter().write("data:"+i+"\n\n");
response.getWriter().flush();
try {
TimeUtil.SENCONDS.sleep(1);
} catch(InterruptedException e){}
}
}
- 前台代码
<html>
<!-- 此处省略其他设置 -->
<script type="text/javascript">
//初始化,参数为url[即后台API地址]
var sse = new EventSource("SSE");
sse.onmessage = function(e) {
console.log("message",e.data,e);//将数据打印出来,数据在方法参数中
}
// SSE有自动重连的特性,即输出一圈 data:0/1/2/3/4 后还会再输出一圈
// 如果想停止自动重连则需要在function中添加判断
</script>
</html>
五.响应式微服务实例开发
1.环境搭建
- 在https://start.spring.io中引入ReativeMongoDB和SpringReactiveWeb[包含WebFlux]依赖,并下载项目
- 在Application.java中引入启动MongoDB的注解
@EnableReactiveMongoRepositories
2.MongoDB连接初试
-
添加Domain类
@Document(collection="user")//表示对应的MongoDB中的表名 @Data//lambok注解,自动编写Set/Get/toString,不需人为添加 public class User { @Id//定义Id,MongoDB中id一般都是字符串类型 private String id; private String name; private int age; }
-
编写仓库操作接口代码
@Repository public interface UserRepository extends ReactiveMongoRepository<User, String> { //编写根据年龄查找用户,Springboot JPA会自动帮助生成对应的SQL public Flux<User> findByAgeBetween(int start, int end); //也可以通过@Query注解编写MongoDB的查询语句 @Query("{'age':{'$gte':20,'$lte':30}}")//表示大于等于20和小于等于30 public oldUser(); }
-
编写Controller
@RestController @RequestMapping("/user") public class UserController { private final UserRepository repository; public class UserController(UserRepository repository) { this.repository = repository; } /** * 以数组形式一次返回数据 */ @GetMapping("/") public Flux<User> getAll() {//直接返回 //引入仓库 return repository.findAll(); } /** * 以SSE形式多次返回数据 */ @GetMapping(value="/stream/all", produces=MediaType.TEXT_EVENT_STREAM_VALUE)//以流式返回 public Flux<User> streamGetAll() {//流式返回 //引入仓库 return repository.findAll(); } /** * 新增数据,返回新增的数据信息 */ @PostMapping("/") public Mono<User> createUser(@RequestBody User user) { //spring data jpa里面,新增和修改都是save,有id是修改,id为空是新增 //根据实际情况是否置空id user.setId(null); return this.repository.save(user); } /** * 根据id删除用户 * 存在的时候返回200,不存在返回404 */ @DeleteMapping("/{id}") public Mono<ResponseEntity<Void>> deleteUser (@PathVariable("id")String id) { // this.responsity.deleteById(id) 返回Mono<Void>没有返回值,不能判断数据是否存在 return this.respository.findById(id) //当要操作数据并返回Mono时使用flatMap //如果不操作数据,只是对数据进行转换,使用map .flatMap(user->this.responsitory.delete(user) .then(Mono.just(new ResponseEntity<>(HttpStatus.OK)))) .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND)); } /** * 修改数据 * 存的时候返回200,修改后的数据不存在时返回404 */ @PutMapping("/{id}") public Mono<ResponseEntity<User>> updateUser( @PathVariable("id")String id, @RequestBody User user ) { return this.respository.findById(id) //操作数据,操作返回Mono .flatMap(u->{ u.setAge(user.getAge()); u.setName(user.getName()); return this.repository.save(u);//没有直接发送this.repository.save(user)的原因是不知道传过来的User对象会不会带id,如果没带id则会执行插入 }) //转换数据 .map(u->new ResponseEntity<User>(u.HttpStatus.OK)) .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND)); } /** * 根据id查找用户 * 存在返回用户信息,不存在返回404 */ @GetMapping("/{id}") public Mono<ResponseEntity<User>> findUserById( @PathVariable("id") String id ){ return this.respository.findById(id) .map(u-> new ResponseEntity<User>(u, HttpStatus.OK)) .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND)); } /** * 根据年龄查找用户 */ @GetMapping("/age/{start}/{end}") public Flux<User> findByAge(@PathVariable("start") int start, @PathVariable("end") int end) { return this.repository.findByAgeBetween(start,end); } }
-
下载MongoDB并启动[默认端口是27017]
-
在properties中配置数据库
#配置数据库 spring.data.mongodb.uri=mongodb://localhost:27017/webfluxDB
3.添加参数校验部分
-
在Domain.java类中添加注解进行参数校验
@Document(collection="user")//表示对应的MongoDB中的表名 @Data//lambok注解,自动编写Set/Get/toString,不需人为添加 public class User { @Id//定义Id,MongoDB中id一般都是字符串类型 private String id; @NotBlank//非空 private String name; @Range(min=10, max=20)//设置属性值区间 private int age; }
-
在Controller接收的API处使用@Vaild注解,如下
/** * 新增数据,返回新增的数据信息 */ @PostMapping("/") public Mono<User> createUser(@Vaild @RequestBody User user) { return this.repository.save(user); }
-
添加一个Controller的切面,用于当参数不通过的时候抛出异常
/** * 异常处理切面 */ @ControllerAdvice public class CheckAdvice { //用于处理异常 @ExceptionHandler(WebExchangeBindException.class) public ResponseEntity handleBindException(WebExchangeBindException e) { return new ResponseEntity<String>(toStr(e), HttpStatus.BAD_REQUEST); } //将校验异常转成字符串 public String toStr(WebExchangeBindException e) { return e.getFieldErrors().stream()//获得异常的字段数组 .map(e->e.getField()+":"+e.getDefaultMessage());//将异常对象数组转成字符串数组 .reduce("",(s1,s2)->s1+"\n"+s2);//将字符串数组reduce拼接 } }
六.RouterFunction模式实例开发
使用RouterFunction模式开发所有的Controller长得都一样,都是输入ServerRequest,返回ServerResponse
1.编写HandlerFunction(即Controller层)
-
编写UserHandler
@Component public class UserHandler { private final UserRepository repository; public UserHandler(UserRepository repository) { this.repository = repository; } /** * 得到所有用户 */ public Mono<ServerResponse> getAllUser(ServerRequest request) { //.ok()表示返回的状态码是200 //.contentType()表示返回的ContentType内容 //.body()表示响应体内容 return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON_UTF8) .body(this.repository.findAll(),User.class); } /** * 创建用户 */ public Mono<ServerResponse> createUser(ServerRequest request) { Mono<User> user = request.bodyToMono(User.class);//通过bodyToMono将请求中提交的数据转换成Mono<User>类型 //.ok()表示返回的状态码是200 //.contentType()表示返回的ContentType内容 //.body()表示响应体内容 return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON_UTF8) .body(this.repository.saveAll(user),User.class); } /** * 根据id删除用户 */ public Mono<ServerResponse> deleteUserById (ServerRequest request) { String id = request.pathVariable("id");//获取request请求对象中的PathVariable的值 return this.repository.findById(id) .flatMap(user->this.repository.delete(user) .then(ServerResponse.ok().build())) .switchIfEmpty(ServerResponse.notFound().build()); } }
2.编写RouterFunction
RouterFunction的作用是将一个url与一个HandleFunction对应起来
-
编写AllRouters类做RouterFunction功能
@Configuration public class AllRouters { //RouterFunction是将url与handleFunction对应起来 @Bean RouterFunction<ServerResponse> userRouter(UserHandler handler) {//通过参数的形式传入对应的Handler类 //RequestPredicates.path("/url")的方式给出统一的url前缀 //当访问/user/接口时会将请求发送到UserHandler的getAllUser()方法上 return RouterFunction.nest( RequestPredicates.path("/user"),//相当于类上的RequestMapping //使用router定义方法上的RequestMapping和对应的方法 RouterFunctions.route(RequestPredicates.GET("/"),hander::getAllUser) //使用router可以配置多个路由 .andRoute( RequestPredicates.POST("/") .and(accept(MediaType.APPLICATION_JSON_UTF8)), handler::createUser ) //删除用户 .andRoute( RequestPredicates.DELETE("/{id}"), handler::deleteUserById ); ); } }
3.编写ExceptionHandler异常处理类
-
实例代码
@Component @Order(-2)//默认有很多Handler的实现类,需要把优先级调高,否则不会工作[数值越小,优先级越高] public class ExceptionHandler implements WebExceptionHandler { @Override public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) { ServerHttpResponse response = exchange.getResponse(); //设置响应头400 response.setStatusCode(HttpStatus.BAD_REQUEST); //设置返回类型 response.getHeaders().setContentType(MediaType.TEXT_PLAIN); //异常信息 String errorMsg = toStr(ex); DataBuffer db = response.bufferFactory().wrap(errorMsg.getBytes()); return response.writeWith(Mono.just(db)); } private String toStr(Throwable ex) { //已知异常 if(ex instanceof CheckException) { CheckException e = (CheckException) ex; return e.getFieldName() + ": invalid value "+e.getFieldValue(); } //未知异常,需要打印堆栈,方便定位 else { ex.printStackTrace(); return ex.toString(); } } }