Spring Boot中使用RSocket的示例代码
1. 概述
rsocket 应用层协议支持 reactive streams 语义, 例如:用rsocket作为http的一种替代方案。在本教程中, 我们将看到 rsocket 用在spring boot中,特别是spring boot 如何帮助抽象出更低级别的rsocket api。
2. 依赖
让我们从添加 spring-boot-starter-rsocket 依赖开始:
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-rsocket</artifactid> </dependency>
这个依赖会传递性的拉取 rsocket 相关的依赖,比如: rsocket-core 和 rsocket-transport-netty
3.示例的应用程序
现在继续我们的简单应用程序。为了突出 rsocket 提供的交互模式,我打算创建一个交易应用程序, 交易应用程序包括客户端和服务器。
3.1. 服务器设置
首先,我们设置由springboot应用程序引导的 rsocket server 服务器。 因为有 spring-boot-starter-rsocket dependency 依赖,所以springboot会自动配置 rsocket server 。 跟平常一样, 可以用属性驱动的方式修改 rsocket server 默认配置值。例如:通过增加如下配置在 application.properties 中,来修改 rsocket 端口
spring.rsocket.server.port=7000
也可以根据需要进一步修改服务器的其他属性
3.2.设置客户端
接下来,我们来设置客户端,也是一个springboot应用程序。虽然springboot自动配置大部分rsocket相关的组件,但还要自定义一些对象来完成设置。
@configuration public class clientconfiguration { @bean public rsocket rsocket() { return rsocketfactory .connect() .mimetype(mimetypeutils.application_json_value, mimetypeutils.application_json_value) .framedecoder(payloaddecoder.zero_copy) .transport(tcpclienttransport.create(7000)) .start() .block(); } @bean rsocketrequester rsocketrequester(rsocketstrategies rsocketstrategies) { return rsocketrequester.wrap(rsocket(), mimetypeutils.application_json, rsocketstrategies); } }
这儿我们正在创建 rsocket 客户端并且配置tcp端口为:7000。注意: 该服务端口我们在前面已经配置过。 接下来我们定义了一个rsocket的装饰器对象 rsocketrequester 。 这个对象在我们跟 rsocket server 交互时会为我们提供帮助。 定义这些对象配置后,我们还只是有了一个骨架。在接下来,我们将暴露不同的交互模式, 并看看springboot在这个地方提供帮助的。
4. springboot rsocket 中的 request/response
我们从 request/response 开始, http 也使用这种通信方式,这也是最常见的、最相似的交互模式。 在这种交互模式里, 由客户端初始化通信并发送一个请求。之后,服务器端执行操作并返回一个响应给客户端--这时通信完成。 在我们的交易应用程序里, 一个客户端询问一个给定的股票的当前的市场数据。 作为回复,服务器会传递请求的数据。
4.1.服务器
在服务器这边,我们首先应该创建一个 controller 来持有我们的处理器方法。 我们会使用 @messagemapping 注解来代替像springmvc中的 @requestmapping 或者 @getmapping 注解
@controller public class marketdatarsocketcontroller { private final marketdatarepository marketdatarepository; public marketdatarsocketcontroller(marketdatarepository marketdatarepository) { this.marketdatarepository = marketdatarepository; } @messagemapping("currentmarketdata") public mono<marketdata> currentmarketdata(marketdatarequest marketdatarequest) { return marketdatarepository.getone(marketdatarequest.getstock()); } }
来研究下我们的控制器。 我们将使用 @controller 注解来定义一个控制器来处理进入rsocket的请求。 另外,注解 @messagemapping 让我们定义我们感兴趣的路由和如何响应一个请求。 在这个示例中, 服务器监听路由 currentmarketdata , 并响应一个单一的结果 mono<marketdata> 给客户端。
4.2. 客户端
接下来, 我们的rsocket客户端应该询问一只股票的价格并得到一个单一的响应。 为了初始化请求, 我们该使用 rsocketrequester 类,如下:
@restcontroller public class marketdatarestcontroller { private final rsocketrequester rsocketrequester; public marketdatarestcontroller(rsocketrequester rsocketrequester) { this.rsocketrequester = rsocketrequester; } @getmapping(value = "/current/{stock}") public publisher<marketdata> current(@pathvariable("stock") string stock) { return rsocketrequester .route("currentmarketdata") .data(new marketdatarequest(stock)) .retrievemono(marketdata.class); } }
注意:在示例中, rsocket 客户端也是一个 rest 风格的 controller ,以此来访问我们的 rsocket 服务器。因此,我们使用 @restcontroller 和 @getmapping 注解来定义我们的请求/响应端点。 在端点方法中, 我们使用的是类 rsocketrequester 并指定了路由。 事实上,这个是服务器端 rsocket 所期望的路由,然后我们传递请求数据。最后,当调用 retrievemono() 方法时,springboot会帮我们初始化一个请求/响应交互。
5. spring boot rsocket 中的 fire and forget 模式
接下来我们将查看 fire and forget 交互模式。正如名字提示的一样,客户端发送一个请求给服务器,但是不期望服务器的返回响应回来。 在我们的交易程序中, 一些客户端会作为数据资源服务,并且推送市场数据给服务器端。
5.1.服务器端
我们来创建另外一个端点在我们的服务器应用程序中,如下:
@messagemapping("collectmarketdata") public mono<void> collectmarketdata(marketdata marketdata) { marketdatarepository.add(marketdata); return mono.empty(); }
我们又一次定义了一个新的 @messagemapping 路由为 collectmarketdata 。此外, spring boot自动转换传入的负载为一个 marketdata 实例。 但是,这儿最大的不同是我们返回一个 mono<void> ,因为客户端不需要服务器的返回。
5.2. 客户端
来看看我们如何初始化我们的 fire-and-forget 模式的请求。 我们将创建另外一个rest风格的端点,如下:
@getmapping(value = "/collect") public publisher<void> collect() { return rsocketrequester .route("collectmarketdata") .data(getmarketdata()) .send(); }
这儿我们指定路由和负载将是一个 marketdata 实例。 由于我们使用 send() 方法来代替 retrievemono() ,所有交互模式变成了 fire-and-forget 模式。
6. spring boot rsocket 中的 request stream
请求流是一种更复杂的交互模式, 这个模式中客户端发送一个请求,但是在一段时间内从服务器端获取到多个响应。 为了模拟这种交互模式, 客户端会询问给定股票的所有市场数据。
6.1.服务器端
我们从服务器端开始。 我们将添加另外一个消息映射方法,如下:
@messagemapping("feedmarketdata") public flux<marketdata> feedmarketdata(marketdatarequest marketdatarequest) { return marketdatarepository.getall(marketdatarequest.getstock()); }
正如所见, 这个处理器方法跟其他的处理器方法非常类似。 不同的部分是我们返回一个 flux<marketdata> 来代替 mono<marketdata> 。 最后我们的rsocket服务器会返回多个响应给客户端。
6.2.客户端
在客户端这边, 我们该创建一个端点来初始化请求/响应通信,如下:
@getmapping(value = "/feed/{stock}", produces = mediatype.text_event_stream_value) public publisher<marketdata> feed(@pathvariable("stock") string stock) { return rsocketrequester .route("feedmarketdata") .data(new marketdatarequest(stock)) .retrieveflux(marketdata.class); }
我们来研究下rsocket请求。 首先我们定义了路由和请求负载。 然后,我们定义了使用 retrieveflux() 调用的响应期望。这部分决定了交互模式。 另外注意:由于我们的客户端也是 rest 风格的服务器,客户端也定义了响应媒介类型 mediatype.text_event_stream_value 。
7.异常的处理
现在让我们看看在服务器程序中,如何以声明式的方式处理异常。 当处理请求/响应式, 我可以简单的使用 @messageexceptionhandler 注解,如下:
@messageexceptionhandler public mono<marketdata> handleexception(exception e) { return mono.just(marketdata.fromexception(e)); }
这里我们给异常处理方法标记注解为 @messageexceptionhandler 。作为结果, 这个方法将处理所有类型的异常, 因为 exception 是所有其他类型的异常的超类。 我们也可以明确地创建更多的不同类型的,不同的异常处理方法。 这当然是请求/响应模式,并且我们返回的是 mono<marketdata> 。我们期望这里的响应类型跟我们的交互模式的返回类型相匹配。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
推荐阅读
-
Spring Boot中使用RSocket的示例代码
-
Spring Boot中使用 Spring Security 构建权限系统的示例代码
-
Dubbo在Spring和Spring Boot中的使用详解
-
spring boot实现软删除的示例代码
-
Spring MVC 使用支付宝接口完成在线支付的示例代码
-
Spring Boot使用Log4j2的实例代码
-
spring boot中内嵌redis的使用方法示例
-
使用Docker部署Spring Boot的方法示例
-
使用python求斐波那契数列中第n个数的值示例代码
-
Spring Boot 使用@ConfigurationProperties注解获取配置文件中的值