欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spring Boot WebFlux 入门

程序员文章站 2022-05-22 09:06:15
...

1. 概述

友情提示:Reactive Programming ,翻译为反应式编程,又称为响应式编程。本文,我们统一使用响应式。不过,比较正确的叫法还是反应式。

Spring Framework 5 在 2017 年 9 月份,发布了 GA 通用版本。既然是一个新的大版本,必然带来了非常多的改进,其中比较重要的一点,就是将响应式编程带入了 Spring 生态。又或者说,将响应式编程“真正”带入了 Java 生态之中。

在此之前,相信绝大多数 Java 开发者,对响应式编程的概念是非常模糊的。甚至说,截止到目前 2019 年 11 月份,对于国内的 Java 开发者,也是知之甚少。

对于我们来说,最早看到的就是 Spring5 提供了一个新的 Web 框架,基于响应式编程的 Spring WebFlux 。至此,SpringMVC 在“干掉” Struts 之后,难道要开始进入 Spring 自己的两个 Web 框架的双雄争霸?

实际上,WebFlux 在出来的两年时间里,据艿艿所了解到的情况,鲜有项目从采用 SpringMVC 迁移到 WebFlux ,又或者新项目直接采用 WebFlux 。这又是为什么呢?

响应式编程,对我们现有的编程方式,是一场颠覆,对于框架也是。

  • 在 Spring 提供的框架中,实际并没有全部实现好对响应式编程的支持。例如说,Spring Transaction 事务组件,在 Spring 5.2 M2 版本,才提供了支持响应式编程的 ReactiveTransactionManager 事务管理器。
  • 更不要说,Java 生态常用的框架,例如说 MyBatis、Jedis 等等,都暂未提供响应式编程的支持。

所以,WebFlux 想要能够真正普及到我们的项目中,不仅仅需要 Spring 自己体系中的框架提供对响应式编程的很好的支持,也需要 Java 生态中的框架也要做到如此。例如说:Spring Boot WebFlux 入门

艿艿:???? Java 框架存在大量基于 ThreadLocal 线程变量,实现参数的透传,改造的成本,实际是不小的。

当然,即使如此,这也并不妨碍我们来对 WebFlux 进行一个小小的入门。毕竟,响应式编程这把火,终将熊熊燃起,烧死那些异性恋。哈哈哈~

艿艿:下面的会涉及比较多的概念,不想看的胖友,直接跳到 「2. 快速入门」 小节,直接开始 WebFlux 的入门。

1.1 响应式编程

我们先简单来了解下响应式编程的相关姿势,以保证能够看懂 WebFlux 入门的代码示例,哈哈哈~

*对响应式编程定义如下:

FROM https://en.wikipedia.org/wiki/Reactive_programming

Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s).

反应式编程是一种异步编程范式,它关注数据流和变化的传播。这意味着可以通过使用编程语言轻松地表示静态(如数组)或动态(如事件发射器)数据流。

Spring 官方文档对响应式编程定义如下:

FROM https://docs.spring.io/spring-framework/docs/5.0.0.BUILD-SNAPSHOT/spring-framework-reference/html/web-reactive.html#web-reactive-programming

In plain terms reactive programming is about non-blocking applications that are asynchronous and event-driven and require a small number of threads to scale vertically (i.e. within the JVM) rather than horizontally (i.e. through clustering).

简单地说,响应式编程是关于非阻塞应用程序的,这些应用程序是异步的、事件驱动的,并且需要少量的线程来垂直伸缩(即在 JVM 中),而不是水平伸缩(即通过集群)。

???? 两个看起来都不很易懂。不过如果胖友看过 Netty 框架的介绍,会发现跟 Spring 的描述非常相像。定义如下:

FROM https://www.oschina.net/p/netty

Netty 是一个 Java 开源框架。Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

是不是都看到了异步 + 事件驱动。本质上,Netty 也是有基于响应式编程的思想。所以在下文中,我们会看到,可以使用 Netty 作为 WebFlux 的服务器。

哔哔了这么多,艿艿来用简单但不完全精准的语言尝试下。以后端 API 请求的处理来举例子。

  • 在现在主流的编程模型中,请求是被同步阻塞处理完成,返回结果给前端。
  • 在响应式的编程模型中,请求是被作为一个事件丢到线程池中执行,等到执行完毕,异步回调结果给主线程,最后返回给前端。

通过这样的方式,主线程(实际是多个,这里只是方便描述哈)不断接收请求,不负责直接同步阻塞处理,从而避免自身被阻塞。

1.2 Reactor 框架

在 Java 生态中,提供响应式编程的框架主要有 ReactorRxJavaJDK9 Flow API 。

那么,Spring 会选择哪个框架作为其响应式编程的基础呢?

  • 首先,可以排除 JDK9 Flow API ,因为 Spring5 要支持 JDK8 版本开始。
  • 其次,Reactor 是 Spring 公司 Pivotal(咳咳咳,2019 年竟然被 VMWare 收购了)是开源的框架,所以必然是“强强联合”,嘿嘿。

如果胖友想要了解 Reactor 和 RxJava 的对比,可以看看 《八个层面比较 Java 8, RxJava, Reactor》 文章,挺详细的。

让我们一起来看看 Reactor 官方对自己的介绍:

FROM https://projectreactor.io/

Reactor is a fourth-generation Reactive library for building non-blocking applications on the JVM based on the Reactive Streams Specification

Reactor 是一个第四代响应式编程框架,用于构建非阻塞 JVM 应用程序,基于 Reactive Streams Specification 来实现。

Reactor Operators and Schedulers can sustain high throughput rates on the order of 10's of millions of messages per second.

Reactor 的操作和调度可以提供每秒千万条消息的高吞吐量。

Plus its low memory footprint should go under most of the radars.

再加上它的低内存占用,应该在大多数雷达(radars)之下。咳咳咳,这个 radars 怎么翻译。

简单来说,Reactor 说是一个响应式编程框架,又快又不占用内存的那种。????

关于 Reactor 的使用,这里艿艿就不过多介绍,感兴趣的胖友,可以看看 《使用 Reactor 进行反应式编程》 文章。如下是对其中的一段内容的节选并修改:

Reactor 有两个非常重要的基本概念:

  • Flux ,表示的是包含 0 到 N 个元素的异步序列。当消息通知产生时,订阅者(Subscriber)中对应的方法 #onNext(t)#onComplete(t) 和 #onError(t) 会被调用。
  • Mono 表示的是包含 0 或者 1 个元素的异步序列。该序列中同样可以包含与 Flux 相同的三种类型的消息通知。
  • 同时,Flux 和 Mono 之间可以进行转换。例如:
    • 对一个 Flux 序列进行计数操作,得到的结果是一个 Mono<Long> 对象。
    • 把两个 Mono 序列合并在一起,得到的是一个 Flux 对象。

???? 其实,可以先暂时简单把 Mono 理解成 Object ,Flux 理解成 List 。嘿嘿~

1.3 Spring WebFlux

Spring 官方文档对 Spring WebFlux 介绍如下:

FROM https://docs.spring.io/spring-framework/docs/5.0.0.BUILD-SNAPSHOT/spring-framework-reference/html/web-reactive.html

Spring Framework 5 includes a new spring-webflux module. The module contains support for reactive HTTP and WebSocket clients as well as for reactive server web applications including REST, HTML browser, and WebSocket style interactions.

Spring Framework 5 提供了一个新的 spring-webflux 模块。该模块包含了:

  • 对响应式支持的 HTTP 和 WebSocket 客户端。
  • 对响应式支持的 Web 服务器,包括 Rest API、HTML 浏览器、WebSocket 等交互方式。

On the server-side WebFlux supports 2 distinct programming models:

  • Annotation-based with @Controller and the other > annotations supported also with Spring MVC
  • Functional, Java 8 lambda style routing and handling

在服务端方面,WebFlux 提供了 2 种编程模型(翻译成使用方式,可能更易懂):

  • 方式一,基于 Annotated Controller 方式实现:基于 @Controller 和 SpringMVC 使用的其它注解。???? 也就是说,我们大体上可以像使用 SpringMVC 的方式,使用 WebFlux 。
  • 方式二,基于函数式编程方式:函数式,Java 8 lambda 表达式风格的路由和处理。???? 可能有点晦涩,晚点我们看了示例就会明白。

Both programming models are executed on the same reactive foundation that adapts non-blocking HTTP runtimes to the Reactive Streams API.

这两个编程模型,都是在同一个响应式基础(foundation)上执行的,该基础将非阻塞 HTTP 运行时(runtime)适配成响应式 API 。???? 简单来说,就是将原有的 API ,使用 Reactor 封装成响应式 API ,让我们开发者使用更加便捷。

The diagram below shows the server-side stack including traditional, Servlet-based Spring MVC on the left from the spring-webmvc module and also the reactive stack on the right from the spring-webflux module.

下图显示了服务端的技术栈,左侧是 spring-webmvc 模块中传统的、基于 Servlet 的 Spring MVC ,右侧是 spring-webflux 模块中的响应式技术栈。

Spring Boot WebFlux 入门

  • ???? 仔细看第一层的两个框框,分别是上面提到的 WebFlux 的两种编程模型。表达的是 SpringMVC 不支持 Router Functions 方式,而 WebFlux 支持。

WebFlux can run on Servlet containers with support for the Servlet 3.1 Non-Blocking IO API as well as on other async runtimes such as Netty and Undertow.

WebFlux 可以运行在:

  • 支持 Servlet 3.1 非阻塞 IO API 的 Servlet 容器上
  • 也可以运行在支持异步运行时的,例如说 Netty 或者 Undertow 上

Each runtime is adapted to a reactive ServerHttpRequest and ServerHttpResponse exposing the body of the request and response as Flux<DataBuffer>, rather than InputStream and OutputStream, with reactive backpressure.

每一个运行时(runtime)适用于将响应式的 ServerHttpRequest 和 ServerHttpResponse 中 request 和 response 的 body 暴露成 Flux<DataBuffer> 对象,而不是 InputStream 和 InputStream 对象,可用于响应式中的背压(backpressure)。???? 这段有点晦涩,简单来说:

REST-style JSON and XML serialization and deserialization is supported on top as a Flux<Object>, and so is HTML view rendering and Server-Sent Events.

REST 风格 API 使用到的 JSON 和 XML 序列化和反序列化,需要提供对 Flux<Object> 的支持。对于 HTML 渲染,和 SSE 也要提供对 Flux<Object> 的支持。

???? 咳咳咳,看完了这一大段,是不是突然有点想捶死艿艿,说的什么 XX 玩样啊!其实,在我们初学 SpringMVC 的时候,也是一脸懵逼的学完。随着我们对 SpringMVC 的日趋熟练,逐步对其提供的组件、原理、源码慢慢熟悉。所以,对于我们来说,WebFlux 乃至响应式编程来说,都是足够新颖的知识,我们要抱着空杯心态,「Stay Hungry, Stay Foolish」 。

如果胖友的时间比较充分,可以选择把 《Spring 文档 —— Web on Reactive Stack》 仔细看看,详尽的介绍了 Spring 在 Web 方面,响应式相关的技术栈。

虽然说上面我们在介绍 WebFlux ,把它搞的很复杂,实际在快速入门使用它,还是非常简单的。下面,开始让我们开始愉快的快速入门下~

考虑到艿艿之前已经写了 《芋道 Spring Boot SpringMVC 入门》 文章,所以本文我们提供的示例,尽量覆盖到在 SpringMVC 提到的内容。

2. 快速入门

示例代码对应仓库:lab-27-webflux-01 。

本小节,我们会使用 spring-boot-starter-webflux 实现 WebFlux 的自动化配置。然后实现用户的增删改查接口。接口列表如下:

请求方法 URL 功能
GET /users/list 查询用户列表
GET /users/get 获得指定用户编号的用户
POST /users/add 添加用户
POST /users/update 更新指定用户编号的用户
POST /users/delete 删除指定用户编号的用户

下面,开始遨游~

2.1 引入依赖

在 pom.xml 文件中,引入相关依赖。 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>lab-27-webflux-01</artifactId>

    <dependencies>
        <!-- 实现对 Spring WebFlux 的自动化配置 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <!-- 方便等会写单元测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

</project>

我们使用 IDEA Maven 插件 ,查看下 spring-boot-starter-webflux 依赖中,所引入的依赖。如下图所示: 

Spring Boot WebFlux 入门 

  • 引入 reactor-core 依赖,使用 Reactor 作为 WebFlux 的响应式框架的基础。
  • 引入 spring-boot-starter-reactor-netty 依赖,使用 Netty 构建 WebFlux 的 Web 服务器。其中 RxNetty 库,是基于 Reactor 的响应式框架的基础之上,提供出 Netty 的响应式 API 。

当然,我们除了使用可以使用其它作为 WebFlux 的 Web 服务器,如下表格:

Server name Server API used Reactive Streams support
Netty Netty API Reactor Netty
Undertow Undertow API spring-web: Undertow to Reactive Streams bridge
Tomcat Servlet 3.1 non-blocking I/O; Tomcat API to read and write ByteBuffers vs byte[] spring-web: Servlet 3.1 non-blocking I/O to Reactive Streams bridge
Jetty Servlet 3.1 non-blocking I/O; Jetty API to write ByteBuffers vs byte[] spring-web: Servlet 3.1 non-blocking I/O to Reactive Streams bridge
Servlet 3.1 container Servlet 3.1 non-blocking I/O spring-web: Servlet 3.1 non-blocking I/O to Reactive Streams bridge
  • 当然,也需要基于 Reactor 的响应式框架的基础之上,封装相应的响应式 API 。

可能胖友会有疑惑,为什么 WebFlux 运行在 Servlet 容器上时,需要 Servlet 3.1+ 以上的容器呢?在 Servlet 3.1 规范发布时,它定义了非常重要的特性,Non-blocking I/O 非阻塞 IO ,提供了异步处理请求的支持。我们来详细展开下:

  • 在 Servlet 3.1 规范之前的版本,请求是只能被 Servlet 同步阻塞处理完成,返回结果给前端。
  • 在 Servlet 3.1 规范开始的版本,请求是允许被 Servlet 丢到线程池中处执行,等到执行完毕,异步回调结果给 Servlet ,最后返回给前端。

推荐胖友在阅读完本文之后,可以看看 《Servlet 3.0/3.1 中的异步处理》 文章,可以对 WebFlux 有更好的理解。

2.2 Application

创建 Application.java 类,配置 @SpringBootApplication 注解即可。代码如下:

// Application.java

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

在 「1.3 Spring WebFlux」 小节中,我们提到了 WebFlux 有两种编程模型,分别是:

  • 方式一,基于 Annotated Controller 方式实现
  • 方式二,基于函数式编程方式

我们分别在下面两个小节来看看。

2.3 基于 Annotated Controller 方式实现

在 cn.iocoder.springboot.lab27.springwebflux.controller 包路径下,创建 UserController 类。代码如下:

// UserController.java

@RestController
@RequestMapping("/users")
public class UserController {

    /**
     * 查询用户列表
     *
     * @return 用户列表
     */
    @GetMapping("/list")
    public Flux<UserVO> list() {
        // 查询列表
        List<UserVO> result = new ArrayList<>();
        result.add(new UserVO().setId(1).setUsername("yudaoyuanma"));
        result.add(new UserVO().setId(2).setUsername("woshiyutou"));
        result.add(new UserVO().setId(3).setUsername("chifanshuijiao"));
        // 返回列表
        return Flux.fromIterable(result);
    }

    /**
     * 获得指定用户编号的用户
     *
     * @param id 用户编号
     * @return 用户
     */
    @GetMapping("/get")
    public Mono<UserVO> get(@RequestParam("id") Integer id) {
        // 查询用户
        UserVO user = new UserVO().setId(id).setUsername("username:" + id);
        // 返回
        return Mono.just(user);
    }

    /**
     * 添加用户
     *
     * @param addDTO 添加用户信息 DTO
     * @return 添加成功的用户编号
     */
    @PostMapping("add")
    public Mono<Integer> add(@RequestBody Publisher<UserAddDTO> addDTO) {
        // 插入用户记录,返回编号
        Integer returnId = 1;
        // 返回用户编号
        return Mono.just(returnId);
    }

    /**
     * 更新指定用户编号的用户
     *
     * @param updateDTO 更新用户信息 DTO
     * @return 是否修改成功
     */
    @PostMapping("/update")
    public Mono<Boolean> update(@RequestBody Publisher<UserUpdateDTO> updateDTO) {
        // 更新用户记录
        Boolean success = true;
        // 返回更新是否成功
        return Mono.just(success);
    }

    /**
     * 删除指定用户编号的用户
     *
     * @param id 用户编号
     * @return 是否删除成功
     */
    @PostMapping("/delete") // URL 修改成 /delete ,RequestMethod 改成 DELETE
    public Mono<Boolean> delete(@RequestParam("id") Integer id) {
        // 删除用户记录
        Boolean success = false;
        // 返回是否更新成功
        return Mono.just(success);
    }

}
  • 在类和方法上,我们添加了 @Controller 和 SpringMVC 在使用的 @GetMapping 和 PostMapping 等注解,提供 API 接口,这个和我们在使用 SpringMVC 是一模一样的。

  • 在 dto 和 vo 包下,有 API 使用到的 DTO 和 VO 类。

  • ???? 因为是入门示例,我们会发现代码十分简单,保持淡定。在后文中,我们会提供和 Spring Data JPA、Spring Data MongoDB、Spring Data Redis 等等整合的示例。

  • #list() 方法,我们最终调用 Flux#fromIterable(Iterable<? extends T> it) 方法,将 List 包装成 Flux 对象返回。

  • #get(Integer id) 方法,我们最终调用 Mono#just(T data) 方法,将 UserVO 包装成 Mono 对象返回。

  • #add(Publisher<UserAddDTO> addDTO) 方法,参数为 Publisher 类型,泛型为 UserAddDTO 类型,并且添加了 @RequestBody 注解,从 request 的 Body 中读取参数。注意,此时提交参数需要使用 "application/json" 等 Content-Type 内容类型。

  • #add(...) 方法,也可以使用 application/x-www-form-urlencoded 或 multipart/form-data 这两个 Content-Type 内容类型,通过 request 的 Form Data 或 Multipart Data 传递参数。代码如下:

// UserController.java

/**
 * 添加用户
 *
 * @param addDTO 添加用户信息 DTO
 * @return 添加成功的用户编号
 */
@PostMapping("add2")
public Mono<Integer> add(Mono<UserAddDTO> addDTO) {
    // 插入用户记录,返回编号
    Integer returnId = UUID.randomUUID().hashCode();
    // 返回用户编号
    return Mono.just(returnId);
}

 

2.4 基于函数式编程方式

在 cn.iocoder.springboot.lab27.springwebflux.controller 包路径下,创建 UserRouter 类。代码如下:

// UserRouter.java

@Configuration
public class UserRouter {

    @Bean
    public RouterFunction<ServerResponse> userListRouterFunction() {
        return RouterFunctions.route(RequestPredicates.GET("/users2/list"),
                new HandlerFunction<ServerResponse>() {

                    @Override
                    public Mono<ServerResponse> handle(ServerRequest request) {
                        // 查询列表
                        List<UserVO> result = new ArrayList<>();
                        result.add(new UserVO().setId(1).setUsername("yudaoyuanma"));
                        result.add(new UserVO().setId(2).setUsername("woshiyutou"));
                        result.add(new UserVO().setId(3).setUsername("chifanshuijiao"));
                        // 返回列表
                        return ServerResponse.ok().bodyValue(result);
                    }

                });
    }

    @Bean
    public RouterFunction<ServerResponse> userGetRouterFunction() {
        return RouterFunctions.route(RequestPredicates.GET("/users2/get"),
                new HandlerFunction<ServerResponse>() {

                    @Override
                    public Mono<ServerResponse> handle(ServerRequest request) {
                        // 获得编号
                        Integer id = request.queryParam("id")
                                .map(s -> StringUtils.isEmpty(s) ? null : Integer.valueOf(s)).get();
                        // 查询用户
                        UserVO user = new UserVO().setId(id).setUsername(UUID.randomUUID().toString());
                        // 返回列表
                        return ServerResponse.ok().bodyValue(user);
                    }

                });
    }

    @Bean
    public RouterFunction<ServerResponse> demoRouterFunction() {
        return route(GET("/users2/demo"), request -> ok().bodyValue("demo"));
    }

}
  • 在类上,添加 @Configuration 注解,保证该类中的 Bean 们,都被扫描到。

  • 在每个方法中,我们都通弄 RouterFunctions#route(RequestPredicate predicate, HandlerFunction<T> handlerFunction) 方法,定义了一条路由。

    • 第一个参数 predicate 参数,是 RequestPredicate 类型,请求谓语,用于匹配请求。可以通过 RequestPredicates 来构建各种条件。
    • 第二个参数 handlerFunction 参数,是 RouterFunction 类型,处理器函数。
  • 每个方法定义的路由,胖友自己看下代码,一眼能看的明白。一般来说,采用第三个方法的写法,更加简洁。注意,需要使用 static import 静态引入,代码如下:

import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.*;
import static org.springframework.web.reactive.function.server.ServerResponse.*;

一般来说,艿艿更加推荐基于 Annotated Controller 方式实现的编程方式,更符合我们现在的开发习惯,学习成本也相对低一些。同时,和 API 接口文档工具 Swagger 也更容易集成。

???? 有没觉得每个 HandlerFunction 函数,和每个 Servlet 有点像。

更多基于函数式编程方式的示例,可以看看如下两篇文章:

3. 测试接口

示例代码对应仓库:lab-27-webflux-01 。

在开发完接口,我们会进行接口的自测。一般情况下,我们先启动项目,然后使用 Postmancurl、浏览器,手工模拟请求后端 API 接口。

实际上,WebFlux 提供了 Web 测试客户端 WebTestClient 类,方便我们快速测试接口。下面,我们对 UserController 提供的接口,进行下单元测试。也就是说,本小节,我们会继续在 lab-27-webflux-01 示例的基础上修改。

MockMvc 提供了集成测试和单元测试的能力,我们分成 「3.1 集成测试」 和 「3.2 单元测试」 来看。如果胖友对测试这块不太了解,可以看看如下两篇文章:

3.1 集成测试

创建 UserControllerTest 测试类,我们来测试一下简单的 UserController 的每个操作。核心代码如下:

// UserControllerTest.java

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
@AutoConfigureWebFlux
@AutoConfigureWebTestClient
public class UserControllerTest {

    @Autowired
    private WebTestClient webClient;

    @Test
    public void testList() {
        webClient.get().uri("/users/list")
                .exchange() // 执行请求
                .expectStatus().isOk() // 响应状态码 200
                .expectBody().json("[\n" +
                "    {\n" +
                "        \"id\": 1,\n" +
                "        \"username\": \"yudaoyuanma\"\n" +
                "    },\n" +
                "    {\n" +
                "        \"id\": 2,\n" +
                "        \"username\": \"woshiyutou\"\n" +
                "    },\n" +
                "    {\n" +
                "        \"id\": 3,\n" +
                "        \"username\": \"chifanshuijiao\"\n" +
                "    }\n" +
                "]"); // 响应结果
    }

    @Test
    public void testGet() {
        // 获得指定用户编号的用户
        webClient.get().uri("/users/get?id=1")
                .exchange() // 执行请求
                .expectStatus().isOk() // 响应状态码 200
                .expectBody().json("{\n" +
                "    \"id\": 1,\n" +
                "    \"username\": \"username:1\"\n" +
                "}"); // 响应结果
    }

    @Test
    public void testGet2() {
        // 获得指定用户编号的用户
        webClient.get().uri("/users/v2/get?id=1")
                .exchange() // 执行请求
                .expectStatus().isOk() // 响应状态码 200
                .expectBody().json("{\n" +
                "    \"id\": 1,\n" +
                "    \"username\": \"test\"\n" +
                "}"); // 响应结果
    }

    @Test
    public void testAdd() {
        Map<String, Object> params = new HashMap<>();
        params.put("username", "yudaoyuanma");
        params.put("password", "nicai");
        // 添加用户
        webClient.post().uri("/users/add")
                .bodyValue(params)
                .exchange() // 执行请求
                .expectStatus().isOk() // 响应状态码 200
                .expectBody().json("1"); // 响应结果。因为没有提供 content 的比较,所以只好使用 json 来比较。竟然能通过
    }

    @Test
    public void testAdd2() { // 发送文件的测试,可以参考 https://dev.to/shavz/sending-multipart-form-data-using-spring-webtestclient-2gb7 文章
        BodyInserters.FormInserter<String> formData = // Form Data 数据,需要这么拼凑
                BodyInserters.fromFormData("username", "yudaoyuanma")
                .with("password", "nicai");
        // 添加用户
        webClient.post().uri("/users/add2")
                .body(formData)
                .exchange() // 执行请求
                .expectStatus().isOk() // 响应状态码 200
                .expectBody().json("1"); // 响应结果。因为没有提供 content 的比较,所以只好使用 json 来比较。竟然能通过
    }


    @Test
    public void testUpdate() {
        Map<String, Object> params = new HashMap<>();
        params.put("id", 1);
        params.put("username", "yudaoyuanma");
        // 修改用户
        webClient.post().uri("/users/update")
                .bodyValue(params)
                .exchange() // 执行请求
                .expectStatus().isOk() // 响应状态码 200
                .expectBody(Boolean.class) // 期望返回值类型是 Boolean
                .consumeWith((Consumer<EntityExchangeResult<Boolean>>) result -> // 通过消费结果,判断符合是 true 。
                        Assert.assertTrue("返回结果需要为 true", result.getResponseBody()));
    }

    @Test
    public void testDelete() {
        // 删除用户
        webClient.post().uri("/users/delete?id=1")
                .exchange() // 执行请求
                .expectStatus().isOk() // 响应状态码 200
                .expectBody(Boolean.class) // 期望返回值类型是 Boolean
                .isEqualTo(true); // 这样更加简洁一些
//                .consumeWith((Consumer<EntityExchangeResult<Boolean>>) result -> // 通过消费结果,判断符合是 true 。
//                        Assert.assertTrue("返回结果需要为 true", result.getResponseBody()));
    }

}
  • 在类上,我们添加了 @AutoConfigureWebTestClient 注解,用于自动化配置我们稍后注入的 WebTestClient Bean 对象 webClient 。在后续的测试中,我们会看到都是通过 webClient 调用后端 API 接口。而每一次调用后端 API 接口,都会执行真正的后端逻辑。因此,整个逻辑,走的是集成测试,会启动一个真实的 Spring 环境。
  • 每次 API 接口的请求,都通过 RequestHeadersSpec 来构建。构建完成后,通过 RequestHeadersSpec#exchange() 方法来执行请求,返回 ResponseSpec 结果。
    • WebTestClient 的 #get()#head()#delete()#options() 方法,返回的是 RequestHeadersUriSpec 对象。
    • WebTestClient 的 #post()#put()#delete()#patch() 方法,返回的是 RequestBodyUriSpec 对象。
    • RequestHeadersUriSpec 和 RequestBodyUriSpec 都继承了 RequestHeadersSpec 接口。
  • 执行完请求后,通过调用 RequestBodyUriSpec 的各种断言方法,添加对结果的预期,相当于做断言。如果不符合预期,则会抛出异常,测试不通过。

3.2 单元测试

为了更好的展示 WebFlux 单元测试的示例,我们需要改写 UserController 的代码,让其会依赖 UserService 。修改点如下:

// UserService.java

@Service
public class UserService {

    public UserVO get(Integer id) {
        return new UserVO().setId(id).setUsername("test");
    }

}
  • 在 UserController 类中,增加 GET /users/v2/get 接口,获得指定用户编号的用户。代码如下: 
// UserController.java

@Autowired
private UserService userService;

/**
 * 获得指定用户编号的用户
 *
 * @param id 用户编号
 * @return 用户
 */
@GetMapping("/v2/get")
public Mono<UserVO> get2(@RequestParam("id") Integer id) {
    // 查询用户
    UserVO user = userService.get(id);
    // 返回
    return Mono.just(user);
}

 

  • 在代码中,我们注入了 UserService Bean 对象 userService ,然后在新增的接口方法中,会调用 UserService#get(Integer id) 方法,获得指定用户编号的用户。

创建 UserControllerTest2 测试类,我们来测试一下简单的 UserController 的新增的这个 API 操作。代码如下:

// UserControllerTest2.java

@RunWith(SpringRunner.class)
@WebFluxTest(UserController.class)
public class UserControllerTest2 {

    @Autowired
    private WebTestClient webClient;

    @MockBean
    private UserService userService;

    @Test
    public void testGet2() throws Exception {
        // Mock UserService 的 get 方法
        System.out.println("before mock:" + userService.get(1)); // <1.1>
        Mockito.when(userService.get(1)).thenReturn(
                new UserVO().setId(1).setUsername("username:1")); // <1.2>
        System.out.println("after mock:" + userService.get(1)); // <1.3>

        // 查询用户列表
        webClient.get().uri("/users/v2/get?id=1")
                .exchange() // 执行请求
                .expectStatus().isOk() // 响应状态码 200
                .expectBody().json("{\n" +
                "    \"id\": 1,\n" +
                "    \"username\": \"username:1\"\n" +
                "}"); // 响应结果
    }

}
  • 在类上添加 @WebFluxTest 注解,并且传入的是 UserController 类,表示我们要对 UserController 进行单元测试。
  • 同时,@WebFluxTest 注解,是包含了 @UserController 的组合注解,所以它会自动化配置我们稍后注入的 WebTestClient Bean 对象 mvc 。在后续的测试中,我们会看到都是通过 webClient 调用后端 API 接口。但是!每一次调用后端 API 接口,并不会执行真正的后端逻辑,而是走的 Mock 逻辑。也就是说,整个逻辑,走的是单元测试会启动一个 Mock 的 Spring 环境。

注意上面每个加粗的地方!

  • userService 属性,我们添加了 @MockBean 注解,实际这里注入的是一个使用 Mockito 创建的 UserService Mock 代理对象。如下图所示:

Spring Boot WebFlux 入门 

  • UserController 中,也会注入一个 UserService 属性,此时注入的就是该 Mock 出来的 UserService Bean 对象。

  • 默认情况下,

  • <1.1> 处,我们调用 UserService#get(Integer id) 方法,然后打印返回结果。执行结果如下:

before mock:null

 

  1. 结果竟然返回的是 null 空。理论来说,此时应该返回一个 id = 1 的 UserVO 对象。实际上,因为此时的 userService 是通过 Mockito 来 Mock 出来的对象,其所有调用它的方法,返回的都是空。
  • <1.2> 处,通过 Mockito 进行 Mock userService 的 #get(Integer id) 方法,当传入的 id = 1 方法参数时,返回 id = 1 并且 username = "username:1" 的 UserVO 对象。

  • <1.3> 处,再次调用 UserService#get(Integer id) 方法,然后打印返回结果。执行结果如下:

after aaa@qq.com

 

 

  1. 打印的就是我们 Mock 返回的 UserVO 对象。
  • 后续,使用 webClient 完成一次后端 API 调用,并进行断言结果是否正确。执行成功,单元测试通过。

可能胖友对单元测试不是很了解,这里在额外推荐一本书 《有效的单元测试》 。很薄,周末抽几个小时就能读完。

如果觉得本小节还不够,可以看看 《SpringBoot WebFlux Test – @WebFluxTest》 文章,写的还是不错的。

4. 全局统一返回

示例代码对应仓库:lab-27-webflux-02 。

在我们提供后端 API 给前端时,我们需要告前端,这个 API 调用结果是否成功:

  • 如果成功,成功的数据是什么。后续,前端会取数据渲染到页面上。
  • 如果失败,失败的原因是什么。一般,前端会将原因弹出提示给用户。

这样,我们就需要有统一的返回结果,而不能是每个接口自己定义自己的风格。一般来说,统一的全局返回信息如下:

  • 成功时,返回成功的状态码 + 数据
  • 失败时,返回失败的状态码 + 错误提示

在标准的 RESTful API 的定义,是推荐使用 HTTP 响应状态码 返回状态码。一般来说,我们实践很少这么去做,主要有如下原因:

  • 业务返回的错误状态码很多,HTTP 响应状态码无法很好的映射。例如说,活动还未开始、订单已取消等等。
  • 国内开发者对 HTTP 响应状态码不是很了解,可能只知道 200、403、404、500 几种常见的。这样,反倒增加学习成本。

所以,实际项目在实践时,我们会将状态码放在 Response Body 响应内容中返回。

在全局统一返回里,我们至少需要定义三个字段:

  • code:状态码。无论是否成功,必须返回。

    • 成功时,状态码为 0 。
    • 失败时,对应业务的错误码。

    关于这一块,也有团队实践时,增加了 success 字段,通过 true 和 false 表示成功还是失败。这个看每个团队的习惯吧。艿艿的话,还是偏好基于约定,返回 0 时表示成功。

  • data:数据。成功时,返回该字段。

  • message:错误提示。失败时,返回该字段。

那么,让我们来看两个示例:

// 成功响应
{
    code: 0,
    data: {
        id: 1,
        username: "yudaoyuanma"
    }
}

// 失败响应
{
    code: 233666,
    message: "徐妈太丑了"
}

下面,我们来看一个示例。

艿艿:考虑到不破坏 「2. 快速入门」 和 「3. 测试接口」 提供的示例,我们需要重新弄搭建一个。

4.1 引入依赖

在 「2.2 引入依赖」 一致。

4.2 Application

在 「2.3 Application」 一致。

4.3 CommonResult

在 cn.iocoder.springboot.lab27.springwebflux.core.vo 包路径,创建 CommonResult 类,用于全局统一返回。代码如下:

// CommonResult.java

public class CommonResult<T> implements Serializable {

    public static Integer CODE_SUCCESS = 0;

    /**
     * 错误码
     */
    private Integer code;
    /**
     * 错误提示
     */
    private String message;
    /**
     * 返回数据
     */
    private T data;

    /**
     * 将传入的 result 对象,转换成另外一个泛型结果的对象
     *
     * 因为 A 方法返回的 CommonResult 对象,不满足调用其的 B 方法的返回,所以需要进行转换。
     *
     * @param result 传入的 result 对象
     * @param <T> 返回的泛型
     * @return 新的 CommonResult 对象
     */
    public static <T> CommonResult<T> error(CommonResult<?> result) {
        return error(result.getCode(), result.getMessage());
    }

    public static <T> CommonResult<T> error(Integer code, String message) {
        Assert.isTrue(!CODE_SUCCESS.equals(code), "code 必须是错误的!");
        CommonResult<T> result = new CommonResult<>();
        result.code = code;
        result.message = message;
        return result;
    }

    public static <T> CommonResult<T> success(T data) {
        CommonResult<T> result = new CommonResult<>();
        result.code = CODE_SUCCESS;
        result.data = data;
        result.message = "";
        return result;
    }

    @JsonIgnore // 忽略,避免 jackson 序列化给前端
    public boolean isSuccess() { // 方便判断是否成功
        return CODE_SUCCESS.equals(code);
    }

    @JsonIgnore // 忽略,避免 jackson 序列化给前端
    public boolean isError() { // 方便判断是否失败
        return !isSuccess();
    }

    // ... 省略 setting/getting/toString 方法

}

4.4 GlobalResponseBodyHandler

在 cn.iocoder.springboot.lab27.springwebflux.core.web 包路径,创建 GlobalResponseBodyHandler 类,全局统一返回的处理器。代码如下:

// GlobalResponseBodyHandler.java

public class GlobalResponseBodyHandler extends ResponseBodyResultHandler {

    private static Logger LOGGER = LoggerFactory.getLogger(GlobalResponseBodyHandler.class);

    private static MethodParameter METHOD_PARAMETER_MONO_COMMON_RESULT;

    private static final CommonResult COMMON_RESULT_SUCCESS = CommonResult.success(null);

    static {
        try {
            // <1> 获得 METHOD_PARAMETER_MONO_COMMON_RESULT 。其中 -1 表示 `#methodForParams()` 方法的返回值
            METHOD_PARAMETER_MONO_COMMON_RESULT = new MethodParameter(
                    GlobalResponseBodyHandler.class.getDeclaredMethod("methodForParams"), -1);
        } catch (NoSuchMethodException e) {
            LOGGER.error("[static][获取 METHOD_PARAMETER_MONO_COMMON_RESULT 时,找不都方法");
            throw new RuntimeException(e);
        }
    }

    public GlobalResponseBodyHandler(List<HttpMessageWriter<?>> writers, RequestedContentTypeResolver resolver) {
        super(writers, resolver);
    }

    public GlobalResponseBodyHandler(List<HttpMessageWriter<?>> writers, RequestedContentTypeResolver resolver, ReactiveAdapterRegistry registry) {
        super(writers, resolver, registry);
    }

    @Override
    @SuppressWarnings("unchecked")
    public Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
        Object returnValue = result.getReturnValue();
        Object body;
        // <1.1> 处理返回结果为 Mono 的情况
        if (returnValue instanceof Mono) {
            body = ((Mono<Object>) result.getReturnValue())
                    .map((Function<Object, Object>) GlobalResponseBodyHandler::wrapCommonResult)
                    .defaultIfEmpty(COMMON_RESULT_SUCCESS);
        // <1.2> 处理返回结果为 Flux 的情况
        } else if (returnValue instanceof Flux) {
            body = ((Flux<Object>) result.getReturnValue())
                    .collectList()
                    .map((Function<Object, Object>) GlobalResponseBodyHandler::wrapCommonResult)
                    .defaultIfEmpty(COMMON_RESULT_SUCCESS);
        // <1.3> 处理结果为其它类型
        } else {
            body = wrapCommonResult(returnValue);
        }
        // <2>
        return writeBody(body, METHOD_PARAMETER_MONO_COMMON_RESULT, exchange);
    }

    private static Mono<CommonResult> methodForParams() {
        return null;
    }

    private static CommonResult<?> wrapCommonResult(Object body) {
        // 如果已经是 CommonResult 类型,则直接返回
        if (body instanceof CommonResult) {
            return (CommonResult<?>) body;
        }
        // 如果不是,则包装成 CommonResult 类型
        return CommonResult.success(body);
    }

}
  • 继承 WebFlux 的 ResponseBodyResultHandler 类,因为该类将 Response 的 body 写回给前端。所以,我们通过重写该类的 #handleResult(ServerWebExchange exchange, HandlerResult result) 方法,将返回结果进行使用 CommonResult 包装。
  • <1> 处,获得 METHOD_PARAMETER_MONO_COMMON_RESULT 。其中 -1 表示 #methodForParams() 方法的返回值类型 Mono<CommonResult> 。后续我们在#handleResult(ServerWebExchange exchange, HandlerResult result) 方法中,会使用到 METHOD_PARAMETER_MONO_COMMON_RESULT 。
  • 重写 #handleResult(ServerWebExchange exchange, HandlerResult result) 方法,将返回结果进行使用 CommonResult 包装。
    • <1.1> 处,处理返回结果为 Mono 的情况。通过调用 Mono#map(Function<? super T, ? extends R> mapper) 方法,将原返回结果,进行包装成 CommonResult<?> 。
    • <1.2> 处,处理返回结果为 Flux 的情况。先通过调用 Flux#collectList() 方法,将其转换成 Mono<List<T>> 对象,后续就是和 <1.1> 相同的逻辑。
    • <1.3> 处,处理结果为其它类型的情况,直接进行包装成 CommonResult<?> 。
  • <2> 处,调用父类方法 #writeBody(Object body, MethodParameter bodyParameter, ServerWebExchange exchange) 方法,实现将结果写回给前端。

在思路上,和 SpringMVC 使用 ResponseBodyAdvice + @ControllerAdvice 注解,是一致的。只是说,WebFlux 暂时没有提供这样的方式,所以咱只好通过继承 ResponseBodyResultHandler 类,重写其 #handleResult(ServerWebExchange exchange, HandlerResult result) 方法,将返回结果进行使用 CommonResult 包装。

4.5 WebFluxConfiguration

在 cn.iocoder.springboot.lab27.springwebflux.config 包路径下,创建 WebFluxConfiguration 配置类。代码如下:

// WebFluxConfiguration.java

@Configuration
public class WebFluxConfiguration {

    @Bean
    public GlobalResponseBodyHandler responseWrapper(ServerCodecConfigurer serverCodecConfigurer,
                                                     RequestedContentTypeResolver requestedContentTypeResolver) {
        return new GlobalResponseBodyHandler(serverCodecConfigurer.getWriters(), requestedContentTypeResolver);
    }

}
  • 在 #responseWrapper(serverCodecConfigurer, requestedContentTypeResolver) 方法中,我们创建了 4.4 GlobalResponseBodyHandler Bean 对象,实现对返回结果的包装。

4.6 UserController

在 cn.iocoder.springboot.lab27.springwebflux.controller 包路径下,创建 UserController 类。代码如下:

// UserController.java

@RestController
@RequestMapping("/users")
public class UserController {

    /**
     * 查询用户列表
     *
     * @return 用户列表
     */
    @GetMapping("/list")
    public Flux<UserVO> list() {
        // 查询列表
        List<UserVO> result = new ArrayList<>();
        result.add(new UserVO().setId(1).setUsername("yudaoyuanma"));
        result.add(new UserVO().setId(2).setUsername("woshiyutou"));
        result.add(new UserVO().setId(3).setUsername("chifanshuijiao"));
        // 返回列表
        return Flux.fromIterable(result);
    }

    /**
     * 获得指定用户编号的用户
     *
     * @param id 用户编号
     * @return 用户
     */
    @GetMapping("/get")
    public Mono<UserVO> get(@RequestParam("id") Integer id) {
        // 查询用户
        UserVO user = new UserVO().setId(id).setUsername("username:" + id);
        // 返回
        return Mono.just(user);
    }

    /**
     * 获得指定用户编号的用户
     *
     * @param id 用户编号
     * @return 用户
     */
    @GetMapping("/get2")
    public Mono<CommonResult<UserVO>> get2(@RequestParam("id") Integer id) {
        // 查询用户
        UserVO user = new UserVO().setId(id).setUsername("username:" + id);
        // 返回
        return Mono.just(CommonResult.success(user));
    }

    /**
     * 获得指定用户编号的用户
     *
     * @param id 用户编号
     * @return 用户
     */
    @GetMapping("/get3")
    public UserVO get3(@RequestParam("id") Integer id) {
        // 查询用户
        UserVO user = new UserVO().setId(id).setUsername("username:" + id);
        // 返回
        return user;
    }

    /**
     * 获得指定用户编号的用户
     *
     * @param id 用户编号
     * @return 用户
     */
    @GetMapping("/get4")
    public CommonResult<UserVO> get4(@RequestParam("id") Integer id) {
        // 查询用户
        UserVO user = new UserVO().setId(id).setUsername("username:" + id);
        // 返回
        return CommonResult.success(user);
    }

}
  • API 接口虽然比较多,但是我们可以先根据返回结果的类型,分成 Flux 和 Mono 两类。然后,艿艿这里又创建了 Mono 分类的四种情况的接口,就是 /users/get/users/get2/users/get3/users/get4 四个。胖友看下这四个接口的返回结果的类型,很容易就明白了。

  • 在 #get(Integer id) 方法,返回的结果是 UserVO 类型。这样,结果会被 GlobalResponseBodyHandler 拦截,包装成 CommonResult 类型返回。请求结果如下:

{
    "code": 0,
    "message": "",
    "data": {
        "id": 10,
        "username": "username:10"
    }
}

 

  • 会有 "message": "" 的返回的原因是,我们使用 SpringMVC 提供的 Jackson 序列化,对于 CommonResult 此时的 message = null 的情况下,会序列化它成 "message": "" 返回。实际情况下,不会影响前端处理。
  • 在 # get2(Integer id) 方法,返回的结果是 Mono<Common<UserVO>> 类型。结果虽然也会被 GlobalResponseBodyHandler 处理,但是不会二次再重复包装成 CommonResult 类型返回。

5. 全局异常处理

示例代码对应仓库:lab-27-webflux-02 。

在 「4. 全局统一返回」 中,我们已经定义了使用 CommonResult 全局统一返回,并且看到了成功返回的示例与代码。这一小节,我们主要是来全局异常处理,最终能也是通过 CommonResult 返回。

那么,我们就不哔哔,直接看着示例代码,遨游起来。

友情提示:该示例,基于 「4. 全局统一返回」 的 lab-27-webflux-02 的基础上,继续改造。

5.1 ServiceExceptionEnum

在 cn.iocoder.springboot.lab27.springwebflux.constants 包路径,创建 ServiceExceptionEnum 枚举类,枚举项目中的错误码。代码如下:

// ServiceExceptionEnum.java

public enum ServiceExceptionEnum {

    // ========== 系统级别 ==========
    SUCCESS(0, "成功"),
    SYS_ERROR(2001001000, "服务端发生异常"),
    MISSING_REQUEST_PARAM_ERROR(2001001001, "参数缺失"),

    // ========== 用户模块 ==========
    USER_NOT_FOUND(1001002000, "用户不存在"),

    // ========== 订单模块 ==========

    // ========== 商品模块 ==========
    ;

    /**
     * 错误码
     */
    private int code;
    /**
     * 错误提示
     */
    private String message;

    ServiceExceptionEnum(int code, String message) {
        this.code = code;
        this.message = message;
    }

    // ... 省略 getting 方法

}
  • 因为错误码是全局的,最好按照模块来拆分。如下是艿艿在 onemall 项目的实践: 
/**
 * 服务异常
 *
 * 参考 https://www.kancloud.cn/onebase/ob/484204 文章
 *
 * 一共 10 位,分成四段
 *
 * 第一段,1 位,类型
 *      1 - 业务级别异常
 *      2 - 系统级别异常
 * 第二段,3 位,系统类型
 *      001 - 用户系统
 *      002 - 商品系统
 *      003 - 订单系统
 *      004 - 支付系统
 *      005 - 优惠劵系统
 *      ... - ...
 * 第三段,3 位,模块
 *      不限制规则。
 *      一般建议,每个系统里面,可能有多个模块,可以再去做分段。以用户系统为例子:
 *          001 - OAuth2 模块
 *          002 - User 模块
 *          003 - MobileCode 模块
 * 第四段,3 位,错误码
 *       不限制规则。
 *       一般建议,每个模块自增。
 */

5.2 ServiceException

我们在一起讨论下 Service 逻辑异常的时候,如何进行返回。这里的逻辑异常,我们指的是,例如说用户名已经存在,商品库存不足等。一般来说,常用的方案选择,有两种:

  • 封装统一的业务异常类 ServiceException ,里面有错误码和错误提示,然后进行 throws 抛出。
  • 封装通用的返回类 CommonResult ,里面有错误码和错误提示,然后进行 return 返回。

一开始,我们选择了 CommonResult ,结果发现如下情况:

  • 因为 Spring @Transactional 声明式事务,是基于异常进行回滚的,如果使用 CommonResult 返回,则事务回滚会非常麻烦。
  • 当调用别的方法时,如果别人返回的是 CommonResult 对象,还需要不断的进行判断,写起来挺麻烦的。

所以,后来我们采用了抛出业务异常 ServiceException 的方式。

在 cn.iocoder.springboot.lab27.springwebflux.core.exception 包路径,创建 ServiceException 异常类,继承 RuntimeException 异常类,用于定义业务异常。代码如下:

// ServiceException.java

public final class ServiceException extends RuntimeException {

    /**
     * 错误码
     */
    private final Integer code;

    public ServiceException(ServiceExceptionEnum serviceExceptionEnum) {
        // 使用父类的 message 字段
        super(serviceExceptionEnum.getMessage());
        // 设置错误码
        this.code = serviceExceptionEnum.getCode();
    }

    // ... 省略 getting 方法

}
  • 提供传入 serviceExceptionEnum 参数的构造方法。具体的处理,看下代码和注释。

5.3 GlobalExceptionHandler

在 cn.iocoder.springboot.lab27.springwebflux.core.web 包路径,创建 GlobalExceptionHandler 类,全局统一返回的处理器。代码如下:

// GlobalExceptionHandler.java

@ControllerAdvice(basePackages = "cn.iocoder.springboot.lab27.springwebflux.controller")
public class GlobalExceptionHandler {

    private Logger logger = LoggerFactory.getLogger(getClass());

    /**
     * 处理 ServiceException 异常
     */
    @ResponseBody
    @ExceptionHandler(value = ServiceException.class)
    public CommonResult serviceExceptionHandler(ServiceException ex) {
        logger.debug("[serviceExceptionHandler]", ex);
        // 包装 CommonResult 结果
        return CommonResult.error(ex.getCode(), ex.getMessage());
    }

    /**
     * 处理 ServerWebInputException 异常
     *
     * WebFlux 参数不正确
     */
    @ResponseBody
    @ExceptionHandler(value = ServerWebInputException.class)
    public CommonResult serverWebInputExceptionHandler(ServerWebInputException ex) {
        logger.debug("[ServerWebInputExceptionHandler]", ex);
        // 包装 CommonResult 结果
        return CommonResult.error(ServiceExceptionEnum.MISSING_REQUEST_PARAM_ERROR.getCode(),
                ServiceExceptionEnum.MISSING_REQUEST_PARAM_ERROR.getMessage());
    }

    /**
     * 处理其它 Exception 异常
     */
    @ResponseBody
    @ExceptionHandler(value = Exception.class)
    public CommonResult exceptionHandler(Exception e) {
        // 记录异常日志
        logger.error("[exceptionHandler]", e);
        // 返回 ERROR CommonResult
        return CommonResult.error(ServiceExceptionEnum.SYS_ERROR.getCode(),
                ServiceExceptionEnum.SYS_ERROR.getMessage());
    }

}
  • 在 WebFlux 中,可以使用通过实现 ResponseBodyAdvice 接口,并添加 @ControllerAdvice 接口,拦截 Controller 的返回结果。注意,我们这里 @ControllerAdvice 注解,设置了 basePackages 属性,只拦截 "cn.iocoder.springboot.lab27.springwebflux.controller" 包,也就是我们定义的 Controller 。为什么呢?因为在项目中,我们可能会引入 Swagger 等库,也使用 Controller 提供 API 接口,那么我们显然不应该让 GlobalResponseBodyHandler 去拦截这些接口,毕竟它们并不需要我们去替它们做全局统一的返回
  • 我们定义了三个方法,通过添加 @ExceptionHandler 注解,定义每个方法对应处理的异常。并且,也添加了 @ResponseBody 注解,标记直接使用返回结果作为 API 的响应。
  • #serviceExceptionHandler(...) 方法,拦截处理 ServiceException 业务异常,直接使用该异常的 code + message 属性,构建出 CommonResult 对象返回。
  • #serverWebInputExceptionHandler(...) 方法,拦截处理 ServerWebInputException 请求参数异常,构建出错误码为 ServiceExceptionEnum.MISSING_REQUEST_PARAM_ERROR 的 CommonResult 对象返回。
  • #exceptionHandler(...) 方法,拦截处理 Exception 异常,构建出错误码为 ServiceExceptionEnum.SYS_ERROR 的 CommonResult 对象返回。这是一个兜底的异常处理,避免有一些其它异常,我们没有在 GlobalExceptionHandler 中,提供自定义的处理方式。

注意,在 #exceptionHandler(...) 方法中,我们还多使用 logger 打印了错误日志,方便我们接入 ELK 等日志服务,发起告警,通知我们去排查解决。如果胖友的系统里暂时没有日志服务,可以记录错误日志到数据库中,也是不错的选择。而其它两个方法,因为是更偏业务的,相对正常的异常,所以无需记录错误日志。

5.4 UserController

在 UserController 类中,我们添加两个 API 接口,抛出异常,方便我们测试全局异常处理的效果。代码如下:

// UserController.java

/**
 * 测试抛出 NullPointerException 异常
 */
@GetMapping("/exception-01")
public UserVO exception01() {
    throw new NullPointerException("没有粗面鱼丸");
}

/**
 * 测试抛出 ServiceException 异常
 */
@GetMapping("/exception-02")
public UserVO exception02() {
    throw new ServiceException(ServiceExceptionEnum.USER_NOT_FOUND);
}
  • 在 #exception01() 方法,抛出 NullPointerException 异常。这样,异常会被 GlobalExceptionHandler#exceptionHandler(...) 方法来拦截,包装成 CommonResult 类型返回。请求结果如下: 
{
    "code": 2001001000,
    "message": "服务端发生异常",
    "data": null
}
  • 在 #exception02() 方法,抛出 ServiceException 异常。这样,异常会被 GlobalExceptionHandler#serviceExceptionHandler(...) 方法来拦截,包装成 CommonResult 类型返回。请求结果如下: 
{
    "code": 1001002000,
    "message": "用户不存在",
    "data": null
}

5.5 简单小结

采用 ControllerAdvice + @ExceptionHandler 注解的方式,可以很方便的实现 WebFlux 的全局异常处理。不过这种方案存在一个弊端,不支持 WebFlux 的基于函数式编程方式。不过考虑到,绝大多数情况下,我们并不会采用基于函数式编程方式,所以这种方案还是没问题的。看了下 WebFlux 的官方文档,也是推荐这种方案,详细可见 《Web on Reactive Stack —— Spring WebFlux —— Managing Exceptions》 。

如果胖友真的需要支持 WebFlux 的基于函数式编程方式,可以看看 《Handling Errors in Spring WebFlux》 文章,通过继承 org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler 抽象类,实现自定义的全局异常处理器。

6. WebFilter 过滤器

示例代码对应仓库:lab-27-webflux-02 。

在 SpringMVC 中,我们可以通过实现 HandlerInterceptor 接口,拦截 SpringMVC 处理请求的过程,自定义前置和处理的逻辑。不了解这块的胖友,可以看看 《芋道 Spring Boot SpringMVC 入门》 的 「6. HandlerInterceptor 拦截器」 小节。

在 WebFlux 中,我们可以通过实现 WebFilter 接口,过滤 WebFlux 处理请求的过程,自定义前置和处理的逻辑。该接口代码如下:

// DemoWebFilterWebFilter.java

/**
 * Contract for interception-style, chained processing of Web requests that may
 * be used to implement cross-cutting, application-agnostic requirements such
 * as security, timeouts, and others.
 *
 * @author Rossen Stoyanchev
 * @since 5.0
 */
public interface WebFilter {

	/**
	 * Process the Web request and (optionally) delegate to the next
	 * {@code WebFilter} through the given {@link WebFilterChain}.
	 * @param exchange the current server exchange
	 * @param chain provides a way to delegate to the next filter
	 * @return {@code Mono<Void>} to indicate when request processing is complete
	 */
	Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain);

}
  • 因为 WebFilterChain 的 #filter(ServerWebExchange exchange) 方法,返回的是 Mono<Void> 对象,所以可以进行各种 Reactor 的操作。咳咳咳,当然需要胖友比较了解 Reactor 的使用,我们才能实现出的 WebFilter ,否则会觉得挺难用的。
  • 另外,WebFilterChain 是由多个 WebFilter 过滤器组成的链,其默认的实现为 DefaultWebFilterChain 。
  • 总体来说,从形态上和我们在 Servlet 看到的 FilterChain 和 Filter 是比较相似的,只是因为结合了 Reactor 响应式编程,所以编写时,差异蛮大的。

6.1 DemoWebFilter

下面,让我们来编写一个简单的 WebFilter 示例。

在 cn.iocoder.springboot.lab27.springwebflux.core.filter 包路径,创建 DemoWebFilter 类,一个简单的 WebFilter 示例。代码如下:

// DemoWebFilter.java

@Component
@Order(1)
public class DemoWebFilter implements WebFilter {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public Mono<Void> filter(ServerWebExchange serverWebExchange, WebFilterChain webFilterChain) {
        // <1> 继续执行请求
        return webFilterChain.filter(serverWebExchange)
                .doOnSuccess(new Consumer<Void>() { // <2> 执行成功后回调

                    @Override
                    public void accept(Void aVoid) {
                        logger.info("[accept][执行成功]");
                    }

                });
    }

}
  • 在类上,添加 @Component 注解,创建 DemoWebFilter Bean 对象。这样,该过滤器就已经加入了 WebFlux 的过滤器链中。目前,暂未内置支持根据请求路径 uri 等条件来配置是否过滤,需要我们自己在实现 #filter(serverWebExchange, webFilterChain) 方法来完成。
  • 在类上,添加 @Order 注解,设置过滤器的顺序。
  • 实现 #filter(serverWebExchange, webFilterChain) 方法,实现在请求执行完成后,打印一条执行成功的日志。
    • <1> 处,调用 WebFilterChain#filter(exchange) 方法,交给过滤器中的下一个过滤器,继续进行过滤处理,并返回 Mono<Void> 对象。
    • <2> 处,调用 Mono#doOnSuccess(Consumer<? super T> onSuccess) 方法,实现在请求执行完成后,打印一条执行成功的日志。这里,我们可以参考 《Reactor 文档 —— Mono》 ,实现各种其它操作。

???? 在后面的小节中,我们会看一个实现处理 Cors 跨域的 CorsWebFilter ,对理解 WebFilter 有一定的帮助。

6.2 Filtering Handler Functions

在基于函数式编程方式中,可以使用如下的方式,实现对每个路由的过滤处理。代码如下:

// UserRouter.java

@Bean
public RouterFunction<ServerResponse> demo2RouterFunction() {
    return route(GET("/users2/demo2"), request -> ok().bodyValue("demo"))
            .filter(new HandlerFilterFunction<ServerResponse, ServerResponse>() {

                @Override
                public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
                    return next.handle(request).doOnSuccess(new Consumer<ServerResponse>() { // 执行成功后回调

                        @Override
                        public void accept(ServerResponse serverResponse) {
                            logger.info("[accept][执行成功]");
                        }

                    });
                }

            });
}

因为实际场景下,使用到基于函数式编程方式比较少,这里就不扩展开来讲。感兴趣的胖友,可以看看 《Web on Reactive Stack —— Spring WebFlux —— Filtering Handler Functions》 文档。

7. Servlet、Filter、Listener

目前测试下来,java.servlet 提供的 Servlet、Filter、Listener 组件,无法在 WebFlux 中使用。测试的示例,可见 lab-27-webflux-03 。

艿艿翻了下 Spring Security 对 WebFlux 的支持,也是通过实现 WebFlux 接口的 WebFilterChainProxy 过滤器,即在 「6. WebFilter 过滤器」 中看到的内容。

8. Cors 跨域

示例代码对应仓库:lab-27-webflux-02 。

在前后端分离之后,我们会碰到跨域的问题。例如说,前端在 http://www.iocoder.cn 域名下,而后端 API 在 http://api.iocoder.cn 域名下。

对跨域不是很了解的胖友,可以看看阮大的 《跨域资源共享 CORS 详解》 文章。???? 当然,也可以先继续本文的阅读。

解决跨域的方式有很多,例如说,在 Nginx 上配置处理跨域请求的参数。又例如说,项目中有网关服务,统一配置处理。当然,本文既然是 Spring Boot WebFlux 入门,那么必然只使用 WebFlux 来解决跨域。目前一共有三种方案:

  • 方式一,使用 @CrossCors 注解,配置每个 API 接口。
  • 方式二,使用 CorsRegistry.java 注册表,配置每个 API 接口。
  • 方案三,使用 CorsFilter.java 过滤器,处理跨域请求。

其中,方案一和方案二,本质是相同的方案,只是配置方式不同。想要理解底层原理的胖友,可以看看 AbstractHandlerMapping#getHandler(ServerWebExchange exchange) 方法,获得处理器之后,会进行下 Cors 跨域的处理。

友情提示:该示例,基于 「6. WebFilter 过滤器」 的 lab-27-webflux-02 的基础上,继续改造。

8.1 @CrossCors

@CrossCors 注解,添加在类或方法上,标记该类/方法对应接口的 Cors 信息。

@CrossCors 注解的常用属性,如下:

  • origins 属性,设置允许的请求来源。[] 数组,可以填写多个请求来源。默认值为 * 。
  • value 属性,和 origins 属性相同,是它的别名。
  • allowCredentials 属性,是否允许客户端请求发送 Cookie 。默认为 false ,不允许请求发送 Cookie 。
  • maxAge 属性,本次预检请求的有效期,单位为秒。默认值为 1800 秒。

@CrossCors 注解的不常用属性,如下:

  • methods 属性,设置允许的请求方法。[] 数组,可以填写多个请求方法。默认值为 GET + POST 。
  • allowedHeaders 属性,允许的请求头 Header 。[] 数组,可以填写多个请求来源。默认值为 * 。
  • exposedHeaders 属性,允许的响应头 Header 。[] 数组,可以填写多个请求来源。默认值为 * 。

一般情况下,我们在每个 Controller 上,添加 @CrossCors 注解即可。当然,如果某个 API 接口希望做自定义的配置,可以在 Method 方法上添加。示例如下:

// TestController.java

@RestController
@RequestMapping("/test")
@CrossOrigin(origins = "*", allowCredentials = "true") // 允许所有来源,允许发送 Cookie
public class TestController {

    /**
     * 获得指定用户编号的用户
     *
     * @return 用户
     */
    @GetMapping("/get")
    @CrossOrigin(allowCredentials = "false") // 允许所有来源,不允许发送 Cookie
    public Mono<UserVO> get() {
        // 查询用户
        UserVO user =  new UserVO().setId(1).setUsername(UUID.randomUUID().toString());
        // 返回
        return Mono.just(user);
    }

}

在绝大数场合下,肯定是在 Controller 上,添加 @CrossOrigin(allowCredentials = "true") 即可

8.2 CorsRegistry

显然,在每个 Controller 上配置 @CrossOrigin 注解,是挺麻烦一事。所以更多的情况下,我们会选择配置 CorsRegistry 注册表。

修改 WebFluxConfiguration 配置类,增加 CorsRegistry 相关的配置。代码如下:

// WebFluxConfiguration.java

@Configuration
public class WebFluxConfiguration implements WebFluxConfigurer {

    @Override
    public void addCorsMappings(CorsRegistry registry) {
        // 添加全局的 CORS 配置
        registry.addMapping("/**") // 匹配所有 URL ,相当于全局配置
                .allowedOrigins("*") // 允许所有请求来源
                .allowCredentials(true) // 允许发送 Cookie
                .allowedMethods("*") // 允许所有请求 Method
                .allowedHeaders("*") // 允许所有请求 Header
//                .exposedHeaders("*") // 允许所有响应 Header
                .maxAge(1800L); // 有效期 1800 秒,2 小时
    }

}
  • 需要实现 WebFluxConfigurer 接口,重写 #addCorsMappings(CorsRegistry registry) 方法,实现自定义的 CORS 配置。
  • 这里配置匹配的路径是 /** ,从而实现全局 CORS 配置。
  • 如果想要配置单个路径的 CORS 配置,可以通过 CorsRegistry#addMapping(String pathPattern) 方法,继续往其中添加 CORS 配置。
  • 如果胖友想要更安全,可以 originns 属性,只填写允许的前端域名地址。

8.3 CorsWebFilter

在 Spring WebFlux 中,内置提供 CorsWebFilter 过滤器,实现对 CORS 的处理。

配置方式很简单,修改 WebFluxConfiguration 配置类,增加 CorsWebFilter 相关的配置。代码如下:

// WebFluxConfiguration.java

@Bean
@Order(0) // 设置 order 排序。这个顺序很重要哦,为避免麻烦请设置在最前
public CorsWebFilter corsFilter() {
    // 创建 UrlBasedCorsConfigurationSource 配置源,类似 CorsRegistry 注册表
    UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
    // 创建 CorsConfiguration 配置,相当于 CorsRegistration 注册信息
    CorsConfiguration config = new CorsConfiguration();
    config.setAllowedOrigins(Collections.singletonList("*")); // 允许所有请求来源
    config.setAllowCredentials(true); // 允许发送 Cookie
    config.addAllowedMethod("*"); // 允许所有请求 Method
    config.setAllowedHeaders(Collections.singletonList("*")); // 允许所有请求 Header
    // config.setExposedHeaders(Collections.singletonList("*")); // 允许所有响应 Header
    config.setMaxAge(1800L); // 有效期 1800 秒,2 小时
    source.registerCorsConfiguration("/**", config);
    // 创建 CorsWebFilter 对象
    return new CorsWebFilter(source); // 创建 CorsFilter 过滤器
}
  • 艿艿已经添加了详细的注释,胖友自己看下噢。效果上,和 「8.2 CorsRegistry」 是一致的。

下面,让我们来看看 CorsWebFilter 的源码,了解下 CORS 跨域的具体逻辑,同时也能再看一个 WebFliter 的示例。代码如下:

// CorsWebFilter.java

public class CorsWebFilter implements WebFilter {

	/**
	 * Cors 配置源
	 */
	private final CorsConfigurationSource configSource;
	/**
	 * Cors 处理器,默认为 DefaultCorsProcessor
	 */
	private final CorsProcessor processor;

	public CorsWebFilter(CorsConfigurationSource configSource) {
		this(configSource, new DefaultCorsProcessor());
	}

	public CorsWebFilter(CorsConfigurationSource configSource, CorsProcessor processor) {
		Assert.notNull(configSource, "CorsConfigurationSource must not be null");
		Assert.notNull(processor, "CorsProcessor must not be null");
		this.configSource = configSource;
		this.processor = processor;
	}

	@Override
	public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
		ServerHttpRequest request = exchange.getRequest();
		// 如果是 CORS 跨域请求
		if (CorsUtils.isCorsRequest(request)) {
			// 获得该接口的 CORS 跨域请求的配置
			CorsConfiguration corsConfiguration = this.configSource.getCorsConfiguration(exchange);
			if (corsConfiguration != null) {
				// 执行 CORS 跨域请求的处理,
				boolean isValid = this.processor.process(corsConfiguration, exchange);
				// !isValid 表示,如果跨域请求的校验不通过
				// CorsUtils.isPreFlightRequest(request) 表示,是 OPTIONS “预检请求”
				if (!isValid || CorsUtils.isPreFlightRequest(request)) {
					// 直接返回空的 Mono ,不进行后续的处理
					return Mono.empty();
				}
			}
		}
		// 继续过滤器的处理
		return chain.filter(exchange);
	}

}

8.4 如何选择

至此,我们已经学习完三种 WebFlux 配置 CORS 的方式。**个人建议的话,推荐使用 「8.3 CorsWebFilter」 方式。**????

在 WebFlux 中,是先执行完 WebFilter 的过滤链,在通过 AbstractHandlerMapping#getHandler(ServerWebExchange exchange) 方法,获得处理器。那么,方案一和方案二,需要多走完整个 WebFilter 的过滤链。这样仿佛也没什么问题?!

但是,在前端使用符合 CORS 规范的网络库时,例如说 Vue 常用的网络库 axios ,在发起非简单请求时,会自动先先发起 OPTIONS “预检”请求,要求服务器确认是否能够这样请求。对于这种“预检”请求,最好被 CorsWebFilter 直接处理了,而不要走完整个 WebFilter 的过滤链。

所以,推荐使用 「8.3 CorsWebFilter」 方式。并且,CorsWebFilter 最好放在整个过滤器链的第一个

如果胖友觉得本小节不够尽兴,可以补充看看 《Web on Reactive Stack —— Spring WebFlux —— CORS》 文档。

9. 集成响应式的 MongoDB

示例代码对应仓库:lab-27-webflux-mongodb 。

在 《芋道 Spring Boot MongoDB 入》 中,我门学习了如何使用 Spring Data MongoDB ,当时选择的是 spring-data-mongodb 库,使用了 MongoDB 的 CRUD 操作。

在本小节,我们来整合 Spring Data MongoDB 到 WebFlux 中,使用 MongoDB 的响应式的 CRUD 操作。然后实现用户的增删改查接口。接口列表如下:

请求方法 URL 功能
GET /users/list 查询用户列表
GET /users/get 获得指定用户编号的用户
POST /users/add 添加用户
POST /users/update 更新指定用户编号的用户
POST /users/delete 删除指定用户编号的用户

9.1 引入依赖

在 pom.xml 文件中,引入相关依赖。

 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>lab-27-webflux-mongodb</artifactId>

    <dependencies>
        <!-- 实现对 Spring WebFlux 的自动化配置 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
            <version>2.2.1.RELEASE</version>
        </dependency>

        <!-- 自动化配置响应式的 Spring Data Mongodb -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
        </dependency>

        <!-- 方便等会写单元测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

</project>

9.2 应用配置文件

在 application.yml 中,添加 MongoDB 配置,如下:

spring:
  data:
    # MongoDB 配置项,对应 MongoProperties 类
    mongodb:
      host: 127.0.0.1
      port: 27017
      database: yourdatabase
      username: test01
      password: password01
      # 上述属性,也可以只配置 uri

logging:
  level:
    org:
      springframework:
        data:
          mongodb:
            core: DEBUG # 打印 mongodb 操作的具体语句。生产环境下,不建议开启。

9.3 Application

创建 Application.java 类,配置 @SpringBootApplication 注解即可。代码如下:

// Application.java

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

9.5 UserRepository

在 cn.iocoder.springboot.lab27.springwebflux.dao 包路径下,创建 UserRepository 接口。代码如下:

// UserRepository.java

public interface UserRepository extends ReactiveMongoRepository<UserDO, Integer> {

    Mono<UserDO> findByUsername(String username);

}
// ReactiveMongoRepository.java

<S extends T> Mono<S> insert(S entity);
<S extends T> Flux<S> insert(Iterable<S> entities);
<S extends T> Flux<S> insert(Publisher<S> entities);
<S extends T> Flux<S> findAll(Example<S> example);
<S extends T> Flux<S> findAll(Example<S> example, Sort sort);

 

  • 所有的返回值,都使用 Mono 和 Flux 包装过。
  • #findByUsername(String username) 方法,定义了查询指定用户名的用户,返回的结果也算是使用 Mono 包装过。

9.5 UserController

在 cn.iocoder.springboot.lab27.springwebflux.controller 包路径下,创建 UserController 类。代码如下:

// UserController.java

@RestController
@RequestMapping("/users")
public class UserController {

    private static final UserDO USER_NULL = new UserDO();

    @Autowired
    private UserRepository userRepository;

    /**
     * 查询用户列表
     *
     * @return 用户列表
     */
    @GetMapping("/list")
    public Flux<UserVO> list() {
        // 返回列表
        return userRepository.findAll()
                .map(userDO -> new UserVO().setId(userDO.getId()).setUsername(userDO.getUsername()));
    }

    /**
     * 获得指定用户编号的用户
     *
     * @param id 用户编号
     * @return 用户
     */
    @GetMapping("/get")
    public Mono<UserVO> get(@RequestParam("id") Integer id) {
        // 返回
        return userRepository.findById(id)
                .map(userDO -> new UserVO().setId(userDO.getId()).setUsername(userDO.getUsername()));
    }

    /**
     * 添加用户
     *
     * @param addDTO 添加用户信息 DTO
     * @return 添加成功的用户编号
     */
    @PostMapping("add")
    public Mono<Integer> add(UserAddDTO addDTO) {
        // 查询用户
        Mono<UserDO> user = userRepository.findByUsername(addDTO.getUsername());

        // 执行插入
        return user.defaultIfEmpty(USER_NULL) // 设置 USER_NULL 作为 null 的情况,否则 flatMap 不会往下走
                .flatMap(new Function<UserDO, Mono<Integer>>() {

                    @Override
                    public Mono<Integer> apply(UserDO userDO) {
                        if (userDO != USER_NULL) {
                            // 返回 -1 表示插入失败。
                            // 实际上,一般是抛出 ServiceException 异常。因为这个示例项目里暂时没做全局异常的定义,所以暂时返回 -1 啦
                            return Mono.just(-1);
                        }
                        // 将 addDTO 转成 UserDO
                        userDO = new UserDO().setId((int) (System.currentTimeMillis() / 1000)) // 使用当前时间戳的描述,作为 ID 。
                                .setUsername(addDTO.getUsername())
                                .setPassword(addDTO.getPassword())
                                .setCreateTime(new Date());
                        // 插入数据库
                        return userRepository.insert(userDO).map(UserDO::getId);
                    }

                });
    }

    /**
     * 更新指定用户编号的用户
     *
     * @param updateDTO 更新用户信息 DTO
     * @return 是否修改成功
     */
    @PostMapping("/update")
    public Mono<Boolean> update(UserUpdateDTO updateDTO) {
        // 查询用户
        Mono<UserDO> user = userRepository.findById(updateDTO.getId());

        // 执行更新
        return user.defaultIfEmpty(USER_NULL) // 设置 USER_NULL 作为 null 的情况,否则 flatMap 不会往下走
                .flatMap(new Function<UserDO, Mono<Boolean>>() {

                    @Override
                    public Mono<Boolean> apply(UserDO userDO) {
                        // 如果不存在该用户,则直接返回 false 失败
                        if (userDO == USER_NULL) {
                            return Mono.just(false);
                        }
                        // 查询用户是否存在
                        return userRepository.findByUsername(updateDTO.getUsername())
                                .defaultIfEmpty(USER_NULL) // 设置 USER_NULL 作为 null 的情况,否则 flatMap 不会往下走
                                .flatMap(new Function<UserDO, Mono<? extends Boolean>>() {

                                    @Override
                                    public Mono<? extends Boolean> apply(UserDO usernameUserDO) {
                                        // 如果用户名已经使用(该用户名对应的 id 不是自己,说明就已经被使用了)
                                        if (usernameUserDO != USER_NULL && !Objects.equals(updateDTO.getId(), usernameUserDO.getId())) {
                                            return Mono.just(false);
                                        }
                                        // 执行更新
                                        userDO.setUsername(updateDTO.getUsername());
                                        userDO.setPassword(updateDTO.getPassword());
                                        return userRepository.save(userDO).map(userDO -> true); // 返回 true 成功
                                    }

                                });
                    }

                });
    }

    /**
     * 删除指定用户编号的用户
     *
     * @param id 用户编号
     * @return 是否删除成功
     */
    @PostMapping("/delete") // URL 修改成 /delete ,RequestMethod 改成 DELETE
    public Mono<Boolean> delete(@RequestParam("id") Integer id) {
        // 查询用户
        Mono<UserDO> user = userRepository.findById(id);

        // 执行删除。这里仅仅是示例,项目中不要物理删除,而是标记删除
        return user.defaultIfEmpty(USER_NULL) // 设置 USER_NULL 作为 null 的情况,否则 flatMap 不会往下走
                .flatMap(new Function<UserDO, Mono<Boolean>>() {

                    @Override
                    public Mono<Boolean> apply(UserDO userDO) {
                        // 如果不存在该用户,则直接返回 false 失败
                        if (userDO == USER_NULL) {
                            return Mono.just(false);
                        }
                        // 执行删除
                        return userRepository.deleteById(id).map(aVoid -> true); // 返回 true 成功
                    }

                });
    }

}
  • 示例为了让示例更加简洁一点,让 Controller 直接调用了 Repository 的代码。实际项目中,还是按照 Controller -> Service -> Repository 的调用关系,嘿嘿。
  • 示例主要考虑胖友可能对 Reactor 的 API 方法不是很熟悉,所以并未使用 Java Lambda 表达式简化代码。实际项目中,Function 可以进行简化。

在 Reactor Mono 中,有两个非常重要的 API 方法,如下:

// Mono.java

public final <R> Mono<R> map(Function<? super T, ? extends R> mapper) {
    // ... 省略实现
}

public final <R> Mono<R> flatMap(Function<? super T, ? extends Mono<? extends R>>
		transformer) {
    // ... 省略实现
}
  • 本质上,使用这个两个方法的目的,都是映射和转换,是一致的。

  • 两者的方法返回,结果都是 Mono<R> 类型,也是一致的。

  • 两者的方法参数,#map(...) 方法是 Function<? super T, ? extends R> ,而 #flatMap(...) 方法是 Function<? super T, ? extends Mono<? extends R>> 。让我们来看看 Function 接口的代码:

// Function.java

public interface Function<T, R> {

    R apply(T t);

}

 

  • Function 接口的第二个泛型 R ,代表 #apply(T t) 接口方法的返回类型。
  • #map(...) 方法对应的 Function 的 R 泛型,为 ? extends R ;#flatMap(...) 方法对应的 Function 的 R 泛型,为 ? extends Mono<? extends R> 。
  • 什么意思呢?#map(...) 方法是其 Function 执行结果在包装上 Mono ;#flatMap(...) 方法是其 Function 执行结果返回就已经包装好 Mono 。

???? 总结来说,#map(...) 和 #flatMap(...) 方法的使用差异是:

  • 如果希望 Function 返回的结果,自动被 Mono 包装,则使用 #map(...) 方法。
  • 如果希望 Function 返回的结果,是自己手动进行 Mono 包装,则使用 #flatMap(...) 方法。

因此,在 UserController#delete(Integer id) 方法中,存在多层嵌套的逻辑,需要先进行查询用户是否存在,再执行删除用户操作。所以,第一层我们使用了 Mono#flatMap(transformer) 方法,第二层我们使用了 Mono#map(mapper) 方法。

另外,对于 #map(...) 和 #flatMap(...) 方法来说,必须保证 Mono 里的数据非空,但是查询库显然会存在查找不到数据,返回 null 的情况,所以这里我们使用了 Mono#defaultIfEmpty(defaultV) 方法,传入空 UserDO 的标识对象 USER_NULL 。否则,#map(...) 和 #flatMap(...) 方法的逻辑,是不会执行的。

???? 是不是不太习惯使用 Reactor 的方式编写代码, = = 艿艿也是~~~

10. 集成响应式的 Redis

示例代码对应仓库:lab-27-webflux-redis 。

在 《芋道 Spring Boot Redis 入》 中,我门学习了如何使用 Spring Data Redis ,当时选择的是 spring-data-redis 库,使用了 Redis 的 key-value 操作。

在本小节,我们来整合 Spring Data Redis 到 WebFlux 中,使用 Redis 的响应式的 key-value 操作。然后实现用户缓存的读取和修改的接口。接口列表如下:

请求方法 URL 功能
GET /users/get 查询指定用户的缓存
POST /users/set 修改指定用户的缓存

10.1 引入依赖

在 pom.xml 文件中,引入相关依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>lab-27-webflux-redis</artifactId>

    <dependencies>
        <!-- 实现对 Spring WebFlux 的自动化配置 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
            <version>2.2.1.RELEASE</version>
        </dependency>

        <!-- 自动化配置响应式的 Spring Data Jedis -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
        </dependency>

        <!-- 方便等会写单元测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

</project>

因为 Jedis 暂时提供非阻塞的异步 API ,具体可以看看 ISSUE#713 。所以,如果我们使用 Spring Data Redis 提供的 Redis 响应式操作,只能暂时使用 Lettuce 或 Redisson 作为 Redis 的客户端。

10.2 应用配置文件

在 application.yml 中,添加 Redis 配置,如下:

spring:
  # 对应 RedisProperties 类
  redis:
    host: 127.0.0.1
    port: 6379
    password: # Redis 服务器密码,默认为空。生产中,一定要设置 Redis 密码!
    database: 0 # Redis 数据库号,默认为 0 。
    timeout: 0 # Redis 连接超时时间,单位:毫秒。

10.3 RedisConfiguration

在 cn.iocoder.springboot.lab27.springwebflux.config 包路径下,创建 RedisConfiguration 配置类。代码如下:

// RedisConfiguration.java

@Configuration
public class RedisConfiguration {

    @Bean
    public ReactiveRedisTemplate<String, Object> commonRedisTemplate(ReactiveRedisConnectionFactory factory) {
        RedisSerializationContext<String, Object> serializationContext =
                RedisSerializationContext.<String, Object>newSerializationContext(RedisSerializer.string())
                        .value(RedisSerializer.json()) // 创建通用的 GenericJackson2JsonRedisSerializer 作为序列化
                        .build();
        return new ReactiveRedisTemplate<>(factory, serializationContext);
    }

    @Bean
    public ReactiveRedisTemplate<String, UserCacheObject> userRedisTemplate(ReactiveRedisConnectionFactory factory) {
        RedisSerializationContext<String, UserCacheObject> serializationContext =
                RedisSerializationContext.<String, UserCacheObject>newSerializationContext(RedisSerializer.string())
                        .value(new Jackson2JsonRedisSerializer<>(UserCacheObject.class)) // 创建专属 UserCacheObject 的 Jackson2JsonRedisSerializer 作为序列化
                        .build();
        return new ReactiveRedisTemplate<>(factory, serializationContext);
    }

}

10.4 Application

创建 Application.java 类,配置 @SpringBootApplication 注解即可。代码如下:

// Application.java

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

10.5 UserController

在 cn.iocoder.springboot.lab27.springwebflux.controller 包路径下,创建 UserController 类。代码如下:

// UserController.java

@RestController
@RequestMapping("/users")
public class UserController {

    // ========== 使用通用的 ReactiveRedisTemplate 的方式 ==========

    @Autowired
    private ReactiveRedisTemplate<String, Object> commonRedisTemplate;

    /**
     * 获得指定用户编号的用户
     *
     * @param id 用户编号
     * @return 用户
     */
    @GetMapping("/get")
    public Mono<UserCacheObject> get(@RequestParam("id") Integer id) {
        String key = genKey(id);
        return commonRedisTemplate.opsForValue().get(key)
                .map(o -> (UserCacheObject) o);
    }

    /**
     * 设置指定用户的信息
     *
     * @param user 用户
     * @return 是否成功
     */
    @PostMapping("/set")
    public Mono<Boolean> set(UserCacheObject user) {
        String key = genKey(user.getId());
        return commonRedisTemplate.opsForValue().set(key, user);
    }

    private static String genKey(Integer id) {
        return "user::" + id;
    }

    // ========== 使用专属的 ReactiveRedisTemplate 的方式 =========

    @Autowired
    private ReactiveRedisTemplate<String, UserCacheObject> userRedisTemplate;

    /**
     * 获得指定用户编号的用户
     *
     * @param id 用户编号
     * @return 用户
     */
    @GetMapping("/v2/get")
    public Mono<UserCacheObject> getV2(@RequestParam("id") Integer id) {
        String key = genKeyV2(id);
        return userRedisTemplate.opsForValue().get(key);
    }

    /**
     * 设置指定用户的信息
     *
     * @param user 用户
     * @return 是否成功
     */
    @PostMapping("/v2/set")
    public Mono<Boolean> setV2(UserCacheObject user) {
        String key = genKeyV2(user.getId());
        return userRedisTemplate.opsForValue().set(key, user);
    }

    private static String genKeyV2(Integer id) {
        return "user::v2::" + id;
    }

}
  • 示例为了让示例更加简洁一点,让 Controller 直接调用了 RedisTemplate 的代码。实际项目中,还是按照 Controller -> Service -> Repository 的调用关系,嘿嘿。

  • 示例提供了两组 API ,差别在于使用通用的 commonRedisTemplate 还是专属的 userRedisTemplate 。让我们看看这两组 API ,存储到 Reids 中的结果的差别。打开 redis-cli 连接 Redis 服务,整个执行过程如下:

# 通用
127.0.0.1:6379> get user::1
"{\"@class\":\"cn.iocoder.springboot.lab27.springwebflux.cacheobject.UserCacheObject\",\"id\":1,\"name\":\"nicai\",\"gender\":1}"

# 专属
127.0.0.1:6379> get user::v2::1
"{\"id\":1,\"name\":\"nicai\",\"gender\":1}"
    • 相同点,是 value 值都是 JSON 序列化 UserCacheObject 的结果。
    • 不同点,是通用相比专属来说,多了 @class 属性。因为 GenericJackson2JsonRedisSerializer 是通用的序列化类,反序列化时,通过 @class 属性,可以知道创建的对象的类型。而 Jackson2JsonRedisSerializer 是专属的序列化类,反序列化时,即使没有 @class 属性,也知道创建的对象的类型。

在实际使用 Spring Data Redis 的时候,一般情况下,我们直接使用 StringRedisTemplate 即可。而在有序列化的时候,给每一个需要序列化的类,创建其专属的 RedisTemplate 。

11. 集成响应式的 Elasticsearch

示例代码对应仓库:lab-27-webflux-elasticsearch 。

在 《芋道 Spring Boot Elasticsearch 入》 中,我门学习了如何使用 Spring Data Elasticsearch ,当时选择的是 spring-data-elasticsearch 库,使用了 Elasticsearch 的 CRUD 操作。

在本小节,我们来整合 Spring Data Elasticsearch 到 WebFlux 中,使用 Elasticsearch 的响应式的 CRUD 操作。然后实现用户的增删改查接口。接口列表如下:

请求方法 URL 功能
GET /users/list 查询用户列表
GET /users/get 获得指定用户编号的用户
POST /users/add 添加用户
POST /users/update 更新指定用户编号的用户
POST /users/delete 删除指定用户编号的用户

因为使用的是 Spring Data Elasticsearch ,所以和 「9. 集成响应式的 MongoDB」 是非常类似的。

11.1 引入依赖

在 pom.xml 文件中,引入相关依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>lab-27-webflux-elasticsearch</artifactId>

    <dependencies>
        <!-- 实现对 Spring WebFlux 的自动化配置 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
            <version>2.2.1.RELEASE</version>
        </dependency>

        <!-- 自动化配置响应式的 Spring Data Elasticsearch -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
        </dependency>

        <!-- 方便等会写单元测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

</project>

自 spring-data-elasticsearch 的 3.2.0.RELEASE 版本开始,其基于响应式的 WebClient ,封装了请求 Elasticsearch HTTP API 的 DefaultReactiveElasticsearchClient 客户端。这意味着什么?Spring Data Elasticsearch 开始支持响应式,并且开始使用 Elasticsearch HTTP API

友情提示:Spring Data Elasticsearch 之前的版本,使用的是 Elasticsearch TCP API 。

11.2 应用配置文件

在 application.yml 中,添加 MongoDB 配置,如下:

spring:
  data:
    # Elasticsearch 配置项
    elasticsearch:
      client:
        # 对应 ReactiveRestClientProperties 配置类
        reactive:
          endpoints: 127.0.0.1:9200 # ES Restful API 地址

11.3 ElasticsearchConfiguration

在 cn.iocoder.springboot.lab27.springwebflux.config 包路径下,创建 ElasticsearchConfiguration 配置类。代码如下:

// ElasticsearchConfiguration.java

@Configuration
@EnableReactiveElasticsearchRepositories // 开启响应式的 Elasticsearch 的 Repository 的自动化配置
public class ElasticsearchConfiguration {
}

11.4 Application

创建 Application.java 类,配置 @SpringBootApplication 注解即可。代码如下:

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
//        System.setProperty("es.set.netty.runtime.available.processors", "false");
        SpringApplication.run(Application.class, args);
    }

}

11.5 UserRepository

在 cn.iocoder.springboot.lab27.springwebflux.dao 包路径下,创建 UserRepository 接口。代码如下:

// UserRepository.java

public interface UserRepository extends ReactiveElasticsearchRepository<UserDO, Integer> {

    Mono<UserDO> findByUsername(String username);

}
// ReactiveElasticsearchRepository.java
@NoRepositoryBean
public interface ReactiveElasticsearchRepository<T, ID> extends ReactiveSortingRepository<T, ID> {

}

 

  • 其定义的接口,都从其父接口 ReactiveCrudRepository 和 ReactiveSortingRepository 所继承。
  • #findByUsername(String username) 方法,定义了查询指定用户名的用户,返回的结果也算是使用 Mono 包装过。

11.6 UserController

在 cn.iocoder.springboot.lab27.springwebflux.controller 包路径下,创建 UserController 类。代码如下:

// UserController.java

@RestController
@RequestMapping("/users")
public class UserController {

    private static final UserDO USER_NULL = new UserDO();

    @Autowired
    private UserRepository userRepository;

    /**
     * 查询用户列表
     *
     * @return 用户列表
     */
    @GetMapping("/list")
    public Flux<UserVO> list() {
        // 返回列表
        return userRepository.findAll()
                .map(userDO -> new UserVO().setId(userDO.getId()).setUsername(userDO.getUsername()));
    }

    /**
     * 获得指定用户编号的用户
     *
     * @param id 用户编号
     * @return 用户
     */
    @GetMapping("/get")
    public Mono<UserVO> get(@RequestParam("id") Integer id) {
        // 返回
        return userRepository.findById(id)
                .map(userDO -> new UserVO().setId(userDO.getId()).setUsername(userDO.getUsername()));
    }

    /**
     * 添加用户
     *
     * @param addDTO 添加用户信息 DTO
     * @return 添加成功的用户编号
     */
    @PostMapping("add")
    public Mono<Integer> add(UserAddDTO addDTO) {
        // 查询用户
        Mono<UserDO> user = userRepository.findByUsername(addDTO.getUsername());

        // 执行插入
        return user.defaultIfEmpty(USER_NULL) // 设置 USER_NULL 作为 null 的情况,否则 flatMap 不会往下走
                .flatMap(new Function<UserDO, Mono<Integer>>() {

                    @Override
                    public Mono<Integer> apply(UserDO userDO) {
                        if (userDO != USER_NULL) {
                            // 返回 -1 表示插入失败。
                            // 实际上,一般是抛出 ServiceException 异常。因为这个示例项目里暂时没做全局异常的定义,所以暂时返回 -1 啦
                            return Mono.just(-1);
                        }
                        // 将 addDTO 转成 UserDO
                        userDO = new UserDO().setId((int) (System.currentTimeMillis() / 1000)) // 使用当前时间戳的描述,作为 ID 。
                                .setUsername(addDTO.getUsername())
                                .setPassword(addDTO.getPassword())
                                .setCreateTime(new Date());
                        // 插入数据库
                        return userRepository.insert(userDO).map(UserDO::getId);
                    }

                });
    }

    /**
     * 更新指定用户编号的用户
     *
     * @param updateDTO 更新用户信息 DTO
     * @return 是否修改成功
     */
    @PostMapping("/update")
    public Mono<Boolean> update(UserUpdateDTO updateDTO) {
        // 查询用户
        Mono<UserDO> user = userRepository.findById(updateDTO.getId());

        // 执行更新
        return user.defaultIfEmpty(USER_NULL) // 设置 USER_NULL 作为 null 的情况,否则 flatMap 不会往下走
                .flatMap(new Function<UserDO, Mono<Boolean>>() {

                    @Override
                    public Mono<Boolean> apply(UserDO userDO) {
                        // 如果不存在该用户,则直接返回 false 失败
                        if (userDO == USER_NULL) {
                            return Mono.just(false);
                        }
                        // 查询用户是否存在
                        return userRepository.findByUsername(updateDTO.getUsername())
                                .defaultIfEmpty(USER_NULL) // 设置 USER_NULL 作为 null 的情况,否则 flatMap 不会往下走
                                .flatMap(new Function<UserDO, Mono<? extends Boolean>>() {

                                    @Override
                                    public Mono<? extends Boolean> apply(UserDO usernameUserDO) {
                                        // 如果用户名已经使用(该用户名对应的 id 不是自己,说明就已经被使用了)
                                        if (usernameUserDO != USER_NULL && !Objects.equals(updateDTO.getId(), usernameUserDO.getId())) {
                                            return Mono.just(false);
                                        }
                                        // 执行更新
                                        userDO.setUsername(updateDTO.getUsername());
                                        userDO.setPassword(updateDTO.getPassword());
                                        return userRepository.save(userDO).map(userDO -> true); // 返回 true 成功
                                    }

                                });
                    }

                });
    }

    /**
     * 删除指定用户编号的用户
     *
     * @param id 用户编号
     * @return 是否删除成功
     */
    @PostMapping("/delete") // URL 修改成 /delete ,RequestMethod 改成 DELETE
    public Mono<Boolean> delete(@RequestParam("id") Integer id) {
        // 查询用户
        Mono<UserDO> user = userRepository.findById(id);

        // 执行删除。这里仅仅是示例,项目中不要物理删除,而是标记删除
        return user.defaultIfEmpty(USER_NULL) // 设置 USER_NULL 作为 null 的情况,否则 flatMap 不会往下走
                .flatMap(new Function<UserDO, Mono<Boolean>>() {

                    @Override
                    public Mono<Boolean> apply(UserDO userDO) {
                        // 如果不存在该用户,则直接返回 false 失败
                        if (userDO == USER_NULL) {
                            return Mono.just(false);
                        }
                        // 执行删除
                        return userRepository.deleteById(id).map(aVoid -> true); // 返回 true 成功
                    }

                });
    }

}

???? 有一点要注意,艿艿在使用的版本,Spring Data Elasticsearch 在使用响应式的模式下,插入数据时,不会自动创建索引。所以,需要胖友手动在 Elastcisearch 中,创建 UserDO 对应的 users 索引。

12. 整合响应式的 JPA

这是一个悲伤的消息,Spring Data 暂时无法提供响应式的 Spring Data JPA 。这特喵的很严重呀!因为对于我们来说,日常开发最重要的,就是操作 MySQL、Oracle、PostgreSQL、SQLServer 等关系数据库。

这是为什么呢?!

  • 问题一,Spring Data JPA 基于 JDBC 实现对数据库的操作,而 JDBC 提供的是阻塞同步的 API 方法。
  • 问题二,MySQL、Oracle 等关系数据库是 BIO 模型,一个客户端的 Connection 对应到服务端就是一个线程。这样,就导致 MySQL、Oracle 无法建立大量的客户端连接了。

对于问题一,目前 Oracle 提出了 ADBA(Asynchronous Database Access API) ,而社区提出了 r2dbc(Reactive Relational Database Connectivity) ,希望提供异步非阻塞的 API ,访问数据库。具体的,胖友可以阅读如下两篇文章:

这样,虽然 MySQL、Oracle 服务器是 BIO 的模型,至少我们在客户端有异步非阻塞的 API 可以调用。只是说,辛苦数据库服务器老哥了,嘿嘿。想要尝鲜的胖友,可以看看 Spring Data R2DBC 项目。???? 看看就好,目前还处于实验阶段哈~这里推荐看看:

对于问题二,需要数据库自身将 BIO 改造成 NIO 的方式,提供支撑大量客户端连接的能力。例如说,国产之光 TiDB 就是基于 NIO 的方式。

当然,还有一种方式,提供数据库 Proxy 代理服务器,提供 NIO 建立连接的方式。这样,即使数据库服务器是 BIO 的方式,Proxy 可以认为是一个数据库的连接池,提供支撑更多的客户端的链接的能力。例如说,Sharding Sphere 提供了 Sharding Proxy ,基于 NIO 的方式实现,可以支持作为 MySQL、PostgreSQL 的 Proxy 服务器。

13. 整合响应式的 R2DBC 和事务

示例代码对应仓库:lab-27-webflux-r2dbc 。

我们知道,JDBC 提供的是同步阻塞的数据库访问 API ,那么显然无法在响应式编程中使用,所以 WebFlux 也无法去使用。于是乎 R2DBC 诞生了。

FROM https://r2dbc.io/

R2DBC (Reactive Relational Database Connectivity) is an endeavor to bring a reactive programming API to SQL databases.

R2DBC(响应式的关系数据库连接),是一种将响应式 API 引入 SQL 数据库的尝试。

我们可以简单将其理解成响应式的 JDBC 的异步非阻塞 API 。目前其有多种驱动实现,本小节我们会使用 jasync-sql 来访问 MySQL 数据库。

FROM https://github.com/jasync-sql/jasync-sql

jasync-sql is a Simple, Netty based, asynchronous, performant and reliable database drivers for PostgreSQL and MySQL written in Kotlin.

  • 基于 Netty 实现
  • 异步、高性能、可靠的 PostgreSQL、MySQL 的驱动
  • 使用 Kotlin 语言编写

在 Spring Framework 5.2 M2 版本,Spring 提供了 ReactiveTransactionManager 响应式的事务管理器。得益于此,我们可以在响应式变成中,使用 @Transactional 注解来实现声明式事务,又或者使用 TransactionalOperator来实现编程式事务。本小节,我们会使用 @Transaction 注解来实现声明式事务,毕竟项目中比较少使用编程式事务。

直接使用 JDBC 访问数据库,编写 CRUD 是很繁琐,同理 R2DBC 也是。所以强大的 Spring Data 提供了 Spring Data R2DBC 库,方便我们使用 R2DBC 开发。

下面,让我们来整合 Spring Data R2DBC 到 WebFlux 中,实现响应式的 CRUD 操作。然后实现用户的增删改查接口。接口列表如下:

请求方法 URL 功能
GET /users/list 查询用户列表
GET /users/get 获得指定用户编号的用户
POST /users/add 添加用户
POST /users/update 更新指定用户编号的用户
POST /users/delete 删除指定用户编号的用户

因为使用的是 Spring Data R2DBC ,所以和 「9. 集成响应式的 MongoDB」 是非常类似的。

13.1 引入依赖

在 pom.xml 文件中,引入相关依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>lab-27-webflux-r2dbc</artifactId>

    <dependencies>
        <!-- 实现对 Spring WebFlux 的自动化配置 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
            <version>2.2.1.RELEASE</version>
        </dependency>

        <!-- 自动化配置响应式的 Spring Data R2DBC -->
        <dependency>
            <groupId>org.springframework.boot.experimental</groupId>
            <artifactId>spring-boot-starter-data-r2dbc</artifactId>
            <version>0.1.0.M2</version>
        </dependency>

        <!-- jasync 的 r2dbc-mysql 驱动 -->
        <dependency>
            <groupId>com.github.jasync-sql</groupId>
            <artifactId>jasync-r2dbc-mysql</artifactId>
            <version>1.0.11</version>
        </dependency>

        <!-- 方便等会写单元测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <repositories>
        <!-- 引入 Spring 的快照仓库 -->
        <repository>
            <id>spring-libs-snapshot</id>
            <url>https://repo.spring.io/libs-snapshot</url>
        </repository>
        <!-- 引入 Jcenter 的快照仓库 -->
        <repository>
            <id>jcenter</id>
            <url>https://jcenter.bintray.com/</url>
        </repository>
    </repositories>

</project>

13.2 应用配置文件

在 application.yml 中,添加 R2DBC 配置,如下:

spring:
  # R2DBC 配置,对应 R2dbcProperties 配置类
  r2dbc:
    url: mysql://47.112.193.81:3306/lab-27-webflux-r2dbc
    username: lab-27-webflux-r2dbc
    password: aaa@qq.com

13.3 DatabaseConfiguration

在 cn.iocoder.springboot.lab27.springwebflux.config 包路径下,创建 DatabaseConfiguration 配置类。代码如下:

// DatabaseConfiguration.java

@Configuration
@EnableTransactionManagement // 开启事务的支持
public class DatabaseConfiguration {

    @Bean
    public ConnectionFactory connectionFactory(R2dbcProperties properties) throws URISyntaxException {
        // 从 R2dbcProperties 中,解析出 host、port、database
        URI uri = new URI(properties.getUrl());
        String host = uri.getHost();
        int port = uri.getPort();
        String database = uri.getPath().substring(1); // 去掉首位的 / 斜杠
        // 创建 jasync Configuration 配置配置对象
        com.github.jasync.sql.db.Configuration configuration = new com.github.jasync.sql.db.Configuration(
                properties.getUsername(), host, port, properties.getPassword(), database);
        // 创建 JasyncConnectionFactory 对象
        return  new JasyncConnectionFactory(new MySQLConnectionFactory(configuration));
    }

    @Bean
    public ReactiveTransactionManager transactionManager(R2dbcProperties properties) throws URISyntaxException {
        return new R2dbcTransactionManager(this.connectionFactory(properties));
    }

}
  • 通过 @EnableTransactionManagement 注解,开启 Spring Transaction 的支持。
  • #connectionFactory(properties) 方法,创建 JasyncConnectionFactory Bean 对象。因为 spring-boot-starter-data-r2dbc 支持 R2DBC 的自动化配置,但是暂不支持自动创建 JasyncConnectionFactory 作为 ConnectionFactory Bean ,所以这里我们需要自定义。
  • #transactionManager(properties) 方法,创建响应式的 R2dbcTransactionManager 事务管理器。

13.4 Application

创建 Application.java 类,配置 @SpringBootApplication 注解即可。代码如下:

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

13.5 UserRepository

在 cn.iocoder.springboot.lab27.springwebflux.dao 包路径下,创建 UserRepository 接口。代码如下:

// UserRepository.java

public interface UserRepository extends ReactiveCrudRepository<UserDO, Integer> {

    @Query("SELECT id, username, password, create_time FROM users WHERE username = :username")
    Mono<UserDO> findByUsername(String username);

}
  • 对应的实体为 UserDO.java 类。对应建表语句见 users.sql 文件。

  • 实现 ReactiveCrudRepository 接口,它是响应式的 Repository 基础接口。

  • #findByUsername(String username) 方法,定义了查询指定用户名的用户,返回的结果也算是使用 Mono 包装过。

    • 注意,这里的 @Query 注解是 Spring Data R2DBC 自定义的,而不是 JPA 规范中的。
    • 如果我们注释掉 @Query 注解,启动项目会报 "Query derivation not yet supported!" 异常提示。目前看下来,Spring Data R2DBC 暂时不支持【基于方法名查询】。

13.6 UserController

在 cn.iocoder.springboot.lab27.springwebflux.controller 包路径下,创建 UserController 类。代码如下:

// UserController.java

@RestController
@RequestMapping("/users")
public class UserController {

    private static final UserDO USER_NULL = new UserDO();

    @Autowired
    private UserRepository userRepository;

    /**
     * 查询用户列表
     *
     * @return 用户列表
     */
    @GetMapping("/list")
    public Flux<UserVO> list() {
        // 返回列表
        return userRepository.findAll()
                .map(userDO -> new UserVO().setId(userDO.getId()).setUsername(userDO.getUsername()));
    }

    /**
     * 获得指定用户编号的用户
     *
     * @param id 用户编号
     * @return 用户
     */
    @GetMapping("/get")
    public Mono<UserVO> get(@RequestParam("id") Integer id) {
        // 返回
        return userRepository.findById(id)
                .map(userDO -> new UserVO().setId(userDO.getId()).setUsername(userDO.getUsername()));
    }

    /**
     * 添加用户
     *
     * @param addDTO 添加用户信息 DTO
     * @return 添加成功的用户编号
     */
    @PostMapping("add")
    @Transactional
    public Mono<Integer> add(UserAddDTO addDTO) {
        // 查询用户
        Mono<UserDO> user = userRepository.findByUsername(addDTO.getUsername());

        // 执行插入
        return user.defaultIfEmpty(USER_NULL) // 设置 USER_NULL 作为 null 的情况,否则 flatMap 不会往下走
                .flatMap(new Function<UserDO, Mono<Integer>>() {

                    @Override
                    public Mono<Integer> apply(UserDO userDO) {
                        if (userDO != USER_NULL) {
                            // 返回 -1 表示插入失败。
                            // 实际上,一般是抛出 ServiceException 异常。因为这个示例项目里暂时没做全局异常的定义,所以暂时返回 -1 啦
                            return Mono.just(-1);
                        }
                        // 将 addDTO 转成 UserDO
                        userDO = new UserDO()
                                .setUsername(addDTO.getUsername())
                                .setPassword(addDTO.getPassword())
                                .setCreateTime(new Date());
                        // 插入数据库
                        return userRepository.save(userDO).flatMap(new Function<UserDO, Mono<Integer>>() {
                            @Override
                            public Mono<Integer> apply(UserDO userDO) {
                                // 如果编号为偶数,抛出异常。
                                if (userDO.getId() % 2 == 0) {
                                    throw new RuntimeException("我就是故意抛出一个异常,测试下事务回滚");
                                }

                                // 返回编号
                                return Mono.just(userDO.getId());
                            }
                        });
                    }

                });
    }

    /**
     * 更新指定用户编号的用户
     *
     * @param updateDTO 更新用户信息 DTO
     * @return 是否修改成功
     */
    @PostMapping("/update")
    public Mono<Boolean> update(UserUpdateDTO updateDTO) {
        // 查询用户
        Mono<UserDO> user = userRepository.findById(updateDTO.getId());

        // 执行更新
        return user.defaultIfEmpty(USER_NULL) // 设置 USER_NULL 作为 null 的情况,否则 flatMap 不会往下走
                .flatMap(new Function<UserDO, Mono<Boolean>>() {

                    @Override
                    public Mono<Boolean> apply(UserDO userDO) {
                        // 如果不存在该用户,则直接返回 false 失败
                        if (userDO == USER_NULL) {
                            return Mono.just(false);
                        }
                        // 查询用户是否存在
                        return userRepository.findByUsername(updateDTO.getUsername())
                                .defaultIfEmpty(USER_NULL) // 设置 USER_NULL 作为 null 的情况,否则 flatMap 不会往下走
                                .flatMap(new Function<UserDO, Mono<? extends Boolean>>() {

                                    @Override
                                    public Mono<? extends Boolean> apply(UserDO usernameUserDO) {
                                        // 如果用户名已经使用(该用户名对应的 id 不是自己,说明就已经被使用了)
                                        if (usernameUserDO != USER_NULL && !Objects.equals(updateDTO.getId(), usernameUserDO.getId())) {
                                            return Mono.just(false);
                                        }
                                        // 执行更新
                                        userDO.setUsername(updateDTO.getUsername());
                                        userDO.setPassword(updateDTO.getPassword());
                                        return userRepository.save(userDO).map(userDO -> true); // 返回 true 成功
                                    }

                                });
                    }

                });
    }

    /**
     * 删除指定用户编号的用户
     *
     * @param id 用户编号
     * @return 是否删除成功
     */
    @PostMapping("/delete") // URL 修改成 /delete ,RequestMethod 改成 DELETE
    public Mono<Boolean> delete(@RequestParam("id") Integer id) {
        // 查询用户
        Mono<UserDO> user = userRepository.findById(id);

        // 执行删除。这里仅仅是示例,项目中不要物理删除,而是标记删除
        return user.defaultIfEmpty(USER_NULL) // 设置 USER_NULL 作为 null 的情况,否则 flatMap 不会往下走
                .flatMap(new Function<UserDO, Mono<Boolean>>() {

                    @Override
                    public Mono<Boolean> apply(UserDO userDO) {
                        // 如果不存在该用户,则直接返回 false 失败
                        if (userDO == USER_NULL) {
                            return Mono.just(false);
                        }
                        // 执行删除
                        return userRepository.deleteById(id).map(aVoid -> true); // 返回 true 成功
                    }

                });
    }

}

注意,我们在 #add(UserAddDTO addDTO) 方法上,添加了 @Transactional 注解,表示整个逻辑需要在事务中进行。同时,为了模拟事务回滚的情况,我们故意在插入记录的 id 编号为偶数时,抛出 RuntimeException 异常,回滚事务。???? 艿艿自己已经测试,确实事务可以回滚。

更多内容,胖友可以看看 《Spring Data R2DBC - Reference Documentation》 文档。

14. 其他内容

WebFlux 基本带来了全新的技术体系,所以涉及的内容,其实非常多,本文并未全部覆盖到。艿艿建议,可以抽时间再把 《Web on Reactive Stack》 文档阅读下。

如下内容,胖友可以找时间看看。

至此,我们已经完成了 Spring WebFlux 的简单入门。如果用一句简单的话来概括 WebFlux 的话,那就是:

  • WebFlux 在 Spring Framework 5 推出的,以 Reactor 库为基础,基于异步和事件驱动,实现的响应式 Web 开发框架。
  • WebFlux 能够充分利用多核 CPU 的硬件资源,处理大量的并发请求。因此,可以在不扩充硬件的资源的情况下,提升系统的吞吐性和伸缩性。

注意,这里我们提到的是吞吐性和伸缩性,而不是提升每个请求的性能。我们来回想下整个 WebFlux 的执行过程:请求是被作为一个事件丢到线程池中执行,等到执行完毕,异步回调结果给主线程,最后返回给前端。

那么整个过程,相比 SpringMVC 的执行过程来说,至少多了一次线程的上下文切换。我们都知道,线程的切换是有成本的。所以,单看一个请求的处理,SpringMVC 的性能是优于 WebFlux 的。

我们上文提到的主线程,一般来说就是 IO 线程。

但是,由于 WebFlux 的 IO 线程是非阻塞的,可以不断解析请求,丢到线程池中执行。而 SpringMVC 的 IO 线程是阻塞的,需要等到请求被处理完毕,才能解析下一个请求并进行处理。这样,随着每个请求的被处理时间越长、并发请求的量级越大,WebFlux 相比 SpringMVC 的整体吞吐量高的越多,平均的请求响应时间越短。如下图所示:

Spring Boot WebFlux 入门

从图中,我们可以看到,随着并发请求量的增大,WebFlux 的响应时间平稳在 100ms 左右,而 SpringMVC 的响应式时间从 3000 并发量开始,响应时间直线上升。???? 感兴趣的胖友,可以参考如下文章,自己做一波性能的基准测试:

那么什么场景下的服务,适合使用 WebFlux 呢?我们可以把任务分成 IO 密集型和 CPU 密集型,而服务本质上,是执行一个又一个的任务,所以也可以这么分。???? 不了解 IO 密集型和 CPU 密集型的胖友,可以先看下 《计算密集型和 IO 密集型》 文章。

而我们业务中编写的代码,都无一幸免需要跟 MySQL、MongoDB、Elasticsearch 等数据库打交道,又或者跟 Redis、Memcached 等缓存服务打交道,还或者需要跟 RocketMQ、RabbitMQ、Kafka 等消息队列打交道。无论这些中间件做的多牛逼,性能多么掉渣天,我们都无法避免会经过网络 IO 和磁盘 IO 。所以,我们提供的服务,大多数都是 IO 密集型。很少会存在,直接从内存读取数据,直接返回的情况。

**因此,我们业务中编写的代码,绝大多多多数都是 IO 密集型,都是适合使用 WebFlux 的。**但是,响应式编程对开发人员的编码能力要求会比较高,一旦脑子一抽,在 IO 线程中编写了阻塞代码,反倒出现性能下滑。具体可以看看艿艿在 《性能测试 —— SpringMVC、Webflux 基准测试》 提供的测试示例,明明白白的。

艿艿建议的话,如果考虑使用 WebFlux 的话,一定要把 Reactor 好好学习下,不然真的是做厮大发好。同时,每次上线之前,对使用 WebFlux 编写的服务,做下性能测试,可以发现编写不正确的地方,找到阻塞 IO 线程的逻辑。

目前,暂时找不到大规模使用 WebFlux 的业务开源项目,最大使用 WebFlux 构建的开源项目,就是 Spring Cloud 开源的网关 Spring Cloud Gateway 。???? 可能,WebFlux 或者响应式编程最好的归宿,暂时是中间件。如果胖友有看过 Dubbo 的线程模型,就会发现和 WebFlux 是异曲同工之妙。

OK ,哔哔结束~如果胖友想要进一步了解 WebFlux 的话,不烦看看 Spring Cloud Gateway 的源码,可以看看艿艿写的 《芋道 Spring Cloud Gateway 源码解析》 。

相关标签: Spring boot