欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

WebFlux服务端开发

程序员文章站 2022-04-30 20:00:36
...

一.SpringWebFlux基础知识

1.概念

  • Spring5提出的新的开发Web的技术栈,非阻塞的开发模式,运行在netty或servlet3.1上,支持很高的并发量
  • 非阻塞的概念
    • WebFlux一个线程里可以处理更多的请求
    • 老的开发模式:一个请求会对应容器里的一个线程
  • 运行环境的不同
    • 老的开发模式:基于ServletAPI,即运行在Servlet容器上面
    • Webflux开发模式:基于响应式流,可以运行在Servlet3.1之后的容器(异步Servlet容器)或Netty上
  • 数据库的不同
    • 目前关系型数据库都是不支持响应式(基于JDBC)的数据库
    • Reactive stack的SpringDataReactiveRepositories是:Mongo,Cassandra,Redis,Couchbase

2. 开发优势

  • 支持高并发(异步非阻塞模式——垂直扩展)
    【题外知识:扩展分为两部分水平扩展和垂直扩展,水平扩展指的是人员和硬件设备的增加,垂直扩展只的是技术栈的变更】

二.异步Servlet

1.为什么要使用异步Servlet?同步Servlet阻塞了什么?

  • 同步Servlet代码
@WebServlet("/SyncServlet")
public class SyncServlet extends HttpServlet {
    public SyncServlet() {
        super();
    }
    protected void doGet(HttpServletRequest request, HttpServletResponse) {
        //获取进入时时间
        long t1 = System.currectTimeMillis();
        //执行业务代码
        doSomeThing(request, response);
        //打印执行耗时
        System.out.println("Sync use:"+(System.currectTimeMillis()-t1));
    }
    protected void doPost(HttpServletRequest request, HttpServletResponse) {
        doGet(request, response);
    }
    //耗时操作执行逻辑
    public void doSomeThing(HttpServletRequest request, HttpServletResponse) {
        //模拟耗时操作
        try {
            TimeUnit.SECONDS.sleep(5);//延迟5秒
        } catch(InterruptedException e) {}
        response.getWriter().append("done");
    }
}
//访问请求大约耗时5s
  • 同步Servlet阻塞了Tomcat容器的servlet线程。
  • 请求执行流程:当网络请求发送到Tomcat容器之后,Tomcat容器会给每一个请求创建一个线程去处理,线程里会调用指定Servlet去处理。当使用同步Servlet时,业务代码花多长时间,Servlet代码就要等多长时间。
  • 异步Servlet代码[不会阻塞Tomcat的Servlet线程]
@WebServlet(asyncSupported=true, urlPatterns={"/AsyncServlet"})
public class AsyncServlet extends HttpServlet {
    public AsyncServlet() {
        super();
    }
    protected void doGet(HttpServletRequest request, HttpServletResponse) {
        //获取进入时时间
        long t1 = System.currectTimeMillis();
        //开启异步,获得异步执行的上下文
        AsyncContext asyncContext = request.startAsync();
        //使用JDK8的CompletableFuture.runAsync(()->...);的方式执行异步操作在不同线程里处理[执行业务代码,传递异步执行的上下文请求及响应]
        CompletableFuture.runAsync(()->doSomeThing(asyncContext,asyncContext.getRequest(), asyncContext.getResponse()));
        //打印执行耗时
        System.out.println("Async use:"+(System.currectTimeMillis()-t1));
    }
    protected void doPost(HttpServletRequest request, HttpServletResponse) {
        doGet(request, response);
    }
    //耗时操作执行逻辑
    public void doSomeThing(AsyncContext asyncContext,ServletRequest request, ServletResponse) {
        //模拟耗时操作
        try {
            TimeUnit.SECONDS.sleep(5);//延迟5秒
        } catch(InterruptedException e) {}
        response.getWriter().append("done");
        //业务代码处理完毕,通知结束
        asyncContext.complete();
    }
}
//访问请求大约耗时16ms

2.异步Servlet怎样编写

  • 开启异步上下文
  • 将异步代码放到独立的线程池中执行
  • 调用异步上下文的complete()方法通知结束

三.WebFlux开发

1.Mono对象实战开发

  • 添加Reactive Web依赖

  • 实例代码

    @RestController
    public class TestController {
        @GetMapping("/1")
        private String get1() {
            return "some string";
        }
        @GetMapping("/2")
        private Mono<String> get2() {
            Mono<String>result = Mono.fromSupplier(()->"some string");
            //返回Mono不会阻塞线程,如果是耗时操作会异步执行
            return result;
        }
    }
    
    • Mono对象是Reactor里面的对象

      • reactor=jdk8 stream+jdk9 reactive stream
      • Mono 0-1个元素(序列)
      • Flux 0-N个元素(序列)
    • 实例演示

      String[] strs = {"1","2","3"};
      //定义订阅者
      Subscriber<Integer> subscriber = new Subscriber<Integer>() {
          private Subscription subscription;
          @Override
          public void onSubscribe(Subscription subscription) {
              //保存订阅关系,需要用它来给发布者响应
              this.subscription = subscription;
              //请求一个数据
              this.subscription.request(1);
          }
          @Override
          public void onNext(Integer item) {
              //接受一个数据,处理
              System.out.println("接收到数据:"+item);
              //处理完调用request再请求一个数据
              this.subscription.request(1);
              //或者 已经达到了目标,调用cancel告诉发布者不再接受数据了
              //this.subscription.cancel();
          }
          @Override
          public void onError(Throwable throwable) {
              //出现了异常(例如处理数据的时候产生了异常)
              throwable.printStackTrace();
              //我们可以告诉发布者,后面不接受数据了
              this.subscription.cancel();
          }
          @Override
          public void onComplete() {
              //全部数据处理完了(发布者关闭了)
              System.out.println("处理完了!");
          }
      }
      
      //这里是jdk8的stream
      Flux.fromArray(strs).map(s->Integer.parseInt(s))
          //最终操作是订阅,即jdk9的reactive stream
          .subscribe(subscriber);
      
      //输出结果:
      //接收到数据:1
      //接收到数据:2
      //接收到数据:3
      //处理完了!
      

2.Flux对象实战开发

  • 实例代码

    @RestController
    public class TestController {
        /**
         * Flux:返回0-n个元素
         * 返回的属性produces需要指定成流的形式
         */
        @GetMapping(value="/3", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
        private Flux<String> flux() {
            Flux<String>result = Flux.fromStream(IntStream.range(1,5).mapToObj(i->{
                try{
                    TimeUnit.SECONDS.sleep(1);
                }catch(InterruptedException e) {}
                return "flux data--"+i;
            }));
            return result;
        }
    }
    //页面输出:
    //flux data--1
    //[隔1秒后页面继续打印]
    //flux data--2
    //[隔1秒后页面继续打印]
    //flux data--3
    //[隔1秒后页面继续打印]
    //flux data--4
    

四.SSE(Server-Sent Events)

http是一问一答的形式,可是Flux是如何做到一次请求页面一点一点输出的呢?通过的就是H5的SSE。

1.实例代码

  • 服务端Servlet处理逻辑【向页面上间隔1秒输出信息】
protected void doGet(HttpServletRequest request, HttpServletResponse response) {
    //设置SSE是必须要设置ContentType和CharacterEncoding
    response.setContentType("text/event-stream");
    response.setCharacterEncoding("utf-8");
    for(int i=0;i<5;i++){
        response.getWriter().write("data:"+i+"\n\n");
        response.getWriter().flush();
        try {
        TimeUtil.SENCONDS.sleep(1);
        } catch(InterruptedException e){}
    }
}
  • 前台代码
<html>
    <!-- 此处省略其他设置 -->
    <script type="text/javascript">
    	//初始化,参数为url[即后台API地址]
        var sse = new EventSource("SSE");
        
        sse.onmessage = function(e) {
            console.log("message",e.data,e);//将数据打印出来,数据在方法参数中
        }
        // SSE有自动重连的特性,即输出一圈 data:0/1/2/3/4 后还会再输出一圈
        // 如果想停止自动重连则需要在function中添加判断
    </script>
</html>

五.响应式微服务实例开发

1.环境搭建

  • 在https://start.spring.io中引入ReativeMongoDB和SpringReactiveWeb[包含WebFlux]依赖,并下载项目
  • 在Application.java中引入启动MongoDB的注解@EnableReactiveMongoRepositories

2.MongoDB连接初试

  • 添加Domain类

    @Document(collection="user")//表示对应的MongoDB中的表名
    @Data//lambok注解,自动编写Set/Get/toString,不需人为添加
    public class User {
        @Id//定义Id,MongoDB中id一般都是字符串类型
        private String id;
        
        private String name;
        
        private int age;
    }
    
  • 编写仓库操作接口代码

    @Repository
    public interface UserRepository extends ReactiveMongoRepository<User, String> {
        //编写根据年龄查找用户,Springboot JPA会自动帮助生成对应的SQL
        public Flux<User> findByAgeBetween(int start, int end);
        
        //也可以通过@Query注解编写MongoDB的查询语句
        @Query("{'age':{'$gte':20,'$lte':30}}")//表示大于等于20和小于等于30
        public oldUser();
    }
    
  • 编写Controller

    @RestController
    @RequestMapping("/user")
    public class UserController {
        private final UserRepository repository;
        
        public class UserController(UserRepository repository) {
            this.repository = repository;
        }
        
        /**
        * 以数组形式一次返回数据
        */
        @GetMapping("/")
        public Flux<User> getAll() {//直接返回
            //引入仓库
            return repository.findAll();
        }
        
        /**
        * 以SSE形式多次返回数据
        */
        @GetMapping(value="/stream/all", produces=MediaType.TEXT_EVENT_STREAM_VALUE)//以流式返回
        public Flux<User> streamGetAll() {//流式返回
            //引入仓库
            return repository.findAll();
        }
        
        /**
        * 新增数据,返回新增的数据信息
        */
        @PostMapping("/")
        public Mono<User> createUser(@RequestBody User user) {
            //spring data jpa里面,新增和修改都是save,有id是修改,id为空是新增
            //根据实际情况是否置空id user.setId(null);
            return this.repository.save(user);
        }
        
        /**
        * 根据id删除用户
        * 存在的时候返回200,不存在返回404
        */
        @DeleteMapping("/{id}")
        public Mono<ResponseEntity<Void>> deleteUser (@PathVariable("id")String id) {
            // this.responsity.deleteById(id) 返回Mono<Void>没有返回值,不能判断数据是否存在
            return this.respository.findById(id)
                //当要操作数据并返回Mono时使用flatMap
                //如果不操作数据,只是对数据进行转换,使用map
                .flatMap(user->this.responsitory.delete(user)
                         .then(Mono.just(new ResponseEntity<>(HttpStatus.OK))))
                .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
        }
        
        /**
        * 修改数据
        * 存的时候返回200,修改后的数据不存在时返回404
        */
        @PutMapping("/{id}")
        public Mono<ResponseEntity<User>> updateUser(
        	@PathVariable("id")String id,
            @RequestBody User user
        	) {
            return this.respository.findById(id)
                //操作数据,操作返回Mono
                .flatMap(u->{
                    u.setAge(user.getAge());
                    u.setName(user.getName());
                    return this.repository.save(u);//没有直接发送this.repository.save(user)的原因是不知道传过来的User对象会不会带id,如果没带id则会执行插入
                })
                //转换数据
                .map(u->new ResponseEntity<User>(u.HttpStatus.OK))
                .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
        }
        
        /**
        * 根据id查找用户
        * 存在返回用户信息,不存在返回404
        */
        @GetMapping("/{id}")
        public Mono<ResponseEntity<User>> findUserById(
        	@PathVariable("id") String id
        	){
            return this.respository.findById(id)
                .map(u-> new ResponseEntity<User>(u, HttpStatus.OK))
                .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
        }
        
        /**
        * 根据年龄查找用户
        */
        @GetMapping("/age/{start}/{end}")
        public Flux<User> findByAge(@PathVariable("start") int start, @PathVariable("end") int end) {
            return this.repository.findByAgeBetween(start,end);
        }
    }
    
  • 下载MongoDB并启动[默认端口是27017]

  • 在properties中配置数据库

    #配置数据库
    spring.data.mongodb.uri=mongodb://localhost:27017/webfluxDB
    

3.添加参数校验部分

  • 在Domain.java类中添加注解进行参数校验

    @Document(collection="user")//表示对应的MongoDB中的表名
    @Data//lambok注解,自动编写Set/Get/toString,不需人为添加
    public class User {
        @Id//定义Id,MongoDB中id一般都是字符串类型
        private String id;
        @NotBlank//非空
        private String name;
        @Range(min=10, max=20)//设置属性值区间
        private int age;
    }
    
  • 在Controller接收的API处使用@Vaild注解,如下

     /**
      * 新增数据,返回新增的数据信息
      */
    @PostMapping("/")
    public Mono<User> createUser(@Vaild @RequestBody User user) {
        return this.repository.save(user);
    }
    
  • 添加一个Controller的切面,用于当参数不通过的时候抛出异常

    /**
     * 异常处理切面
     */
    @ControllerAdvice
    public class CheckAdvice {
        //用于处理异常
        @ExceptionHandler(WebExchangeBindException.class)
        public ResponseEntity handleBindException(WebExchangeBindException e) {
            return new ResponseEntity<String>(toStr(e), HttpStatus.BAD_REQUEST);
        }
        
        //将校验异常转成字符串
        public String toStr(WebExchangeBindException e) {
            return e.getFieldErrors().stream()//获得异常的字段数组
                .map(e->e.getField()+":"+e.getDefaultMessage());//将异常对象数组转成字符串数组
            	.reduce("",(s1,s2)->s1+"\n"+s2);//将字符串数组reduce拼接
        }
    }
    

六.RouterFunction模式实例开发

使用RouterFunction模式开发所有的Controller长得都一样,都是输入ServerRequest,返回ServerResponse

1.编写HandlerFunction(即Controller层)

  • 编写UserHandler

    @Component
    public class UserHandler {
        private final UserRepository repository;
        public UserHandler(UserRepository repository) {
            this.repository = repository;
        }
        /**
         * 得到所有用户
         */
        public Mono<ServerResponse> getAllUser(ServerRequest request) {
            //.ok()表示返回的状态码是200
            //.contentType()表示返回的ContentType内容
            //.body()表示响应体内容
            return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON_UTF8)
                .body(this.repository.findAll(),User.class);
        }
        
        /**
         * 创建用户
         */
        public Mono<ServerResponse> createUser(ServerRequest request) {
            Mono<User> user = request.bodyToMono(User.class);//通过bodyToMono将请求中提交的数据转换成Mono<User>类型
            //.ok()表示返回的状态码是200
            //.contentType()表示返回的ContentType内容
            //.body()表示响应体内容
            return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON_UTF8)
                .body(this.repository.saveAll(user),User.class);
        }
        
        /**
         * 根据id删除用户
         */
        public Mono<ServerResponse> deleteUserById (ServerRequest request) {
            String id = request.pathVariable("id");//获取request请求对象中的PathVariable的值
            return this.repository.findById(id)
                .flatMap(user->this.repository.delete(user)
                         .then(ServerResponse.ok().build()))
                .switchIfEmpty(ServerResponse.notFound().build());
        }
    }
    

2.编写RouterFunction

RouterFunction的作用是将一个url与一个HandleFunction对应起来

  • 编写AllRouters类做RouterFunction功能

    @Configuration
    public class AllRouters {
        //RouterFunction是将url与handleFunction对应起来
        @Bean
        RouterFunction<ServerResponse> userRouter(UserHandler handler) {//通过参数的形式传入对应的Handler类
            //RequestPredicates.path("/url")的方式给出统一的url前缀
            //当访问/user/接口时会将请求发送到UserHandler的getAllUser()方法上
            return RouterFunction.nest(
            	RequestPredicates.path("/user"),//相当于类上的RequestMapping
                //使用router定义方法上的RequestMapping和对应的方法
                RouterFunctions.route(RequestPredicates.GET("/"),hander::getAllUser)
                //使用router可以配置多个路由
                .andRoute(
                	RequestPredicates.POST("/")
                    .and(accept(MediaType.APPLICATION_JSON_UTF8)),
                    handler::createUser
                )
                //删除用户
                .andRoute(
                	RequestPredicates.DELETE("/{id}"),
                    handler::deleteUserById
                );
            );
        }
    }
    

3.编写ExceptionHandler异常处理类

  • 实例代码

    @Component
    @Order(-2)//默认有很多Handler的实现类,需要把优先级调高,否则不会工作[数值越小,优先级越高]
    public class ExceptionHandler implements WebExceptionHandler {
        @Override
        public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
            ServerHttpResponse response = exchange.getResponse();
            //设置响应头400
            response.setStatusCode(HttpStatus.BAD_REQUEST);
            //设置返回类型
            response.getHeaders().setContentType(MediaType.TEXT_PLAIN);
            //异常信息
            String errorMsg = toStr(ex);
            DataBuffer db = response.bufferFactory().wrap(errorMsg.getBytes());
            return response.writeWith(Mono.just(db));
        }
        
        private String toStr(Throwable ex) {
            //已知异常
            if(ex instanceof CheckException) {
                CheckException e = (CheckException) ex;
                return e.getFieldName() + ": invalid value "+e.getFieldValue();
            }
            //未知异常,需要打印堆栈,方便定位
            else {
                ex.printStackTrace();
                return ex.toString();
            }
        }
    }