Project Reactor 深度解析 - 2. 响应式编程调试,FLow的概念设计以及实现
响应式编程的首要问题 - 不好调试
我们在分析传统代码的时候,在哪里打了断点,就能看到直观的调用堆栈,来搞清楚,谁调用了这个代码,之前对参数做了什么修改,等等。但是在响应式编程中,这个问题就很麻烦。来看下面的例子。
public class FluxUtil1 {
public static Flux<Integer> test(Flux<Integer> integerFlux) {
return FluxUtil2.test2(integerFlux.map(Object::toString));
}
}
public class FluxUtil2 {
public static Flux<Integer> test2(Flux<String> stringFlux) {
return stringFlux.map(Integer::new);
}
}
public class FluxTest {
public static void main(String[] args) {
Flux<Integer> integerFlux = Flux.fromIterable(List.of(1, 2, 3));
FluxUtil1.test(integerFlux.log()).subscribe(integer -> {
System.out.println(integer);
});
}
}
我们调试到 subscribe 订阅消费(这个后面会讲),我们一般会想知道我们订阅的这个东西,之前经过了怎样的处理,但是在System.out.println(integer)
打断点,看到的却是:
根本看不出来是FluxUtil1
,FluxUtil2
处理过这个Flux。简单的代码还好,复杂起来调试简直要人命。官方也意识到了这一点,所以提供了一种在操作时捕捉堆栈缓存起来的机制。
这里我们先给出这些机制如何使用,后面我们会分析其中的实现原理。
1. 通过打开全局 Operator 堆栈追踪
设置reactor.trace.operatorStacktrace
这个环境变量为 true,即启动参数中加入 -Dreactor.trace.operatorStacktrace=true
,这样启动全局 Operator 堆栈追踪。
这个也可以通过代码动态打开或者关闭:
//打开
Hooks.onOperatorDebug();
//关闭
Hooks.resetOnOperatorDebug();
打开这个追踪之后,在每多一个 Operator,就会多出来一个 FluxOnAssembly(这个后面原理会详细说明)。通过这个 FluxOnAssembly,里面就有堆栈信息。怎么获取呢?可以通过Scannable.from(某个Flux).parents().collect(Collectors.toList())
获取里面所有层的 Flux,其中包含了 FluxOnAssembly, FluxOnAssembly 就包含了堆栈信息。
我们这里,在System.out.println(integer)
打断点,加入查看Scannable.from(FluxUtil1.test(integerFlux.log())).parents().collect(Collectors.toList())
,就能看到:
可以看出,每次map
操作究竟发生在哪一行代码,都能看到。
如果使用的是专业版的 IDEA,还可以配置:
然后可以在打断点 Debug 就能看到具体堆栈:
2. 通过加入 ReactorDebugAgent 实现
添加依赖:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-tools</artifactId>
<version>略</version>
</dependency>
之后,可以通过这两个代码,开启
//启用
ReactorDebugAgent.init();
//如果有类没有生效,例如初始化没加载,后来动态加载的类,可以调用这个重新处理启用
ReactorDebugAgent.processExistingClasses();
这样,可以动态修改线上应用开启Debug
模式,例如通过 Arthas 这个工具的 ognl 调用静态方法的功能(https://alibaba.github.io/arthas/ognl.html)。
如果使用的是专业版的 IDEA,还可以配置:
然后可以在打断点 Debug 就能看到具体堆栈:
响应式编程 - Flow 的理解
之前说过 FLow 是 Java 9 中引入的响应式编程的抽象概念,对应的类就是:java.util.concurrent.Flow
Flow 是一个概念类,其中定义了三个接口供实现。这三个接口分别是:Publisher
, Subscriber
和 Subscription
。
//标注是一个FunctionalInterface,因为只有一个抽象方法
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
public static interface Subscription {
public void request(long n);
public void cancel();
}
Publisher
是负责生成 item 的,其中的subscribe
方法就是注册Subscriber
进去,用于消费。注册成功后,会调用Subscriber
的onSubscribe
方法,传Subscription
进来。这个Subscription
里面的 request 用于请求Publisher
发送多少 item 过来,cancel 用于告诉Publisher
不要再发 item 过来了。每次Publisher
有 item 生成并且没有超过Subscription
request 的个数限制,onNext
方法会被调用用于发送这个 item。当有异常发生时,onError
就会被调用。当Publisher
判断不会有新的 item 或者异常发生的时候,就会调用onComplete
告诉Subscriber
消费完成了。大体上就是这么个流程。
Project Reactor 就是Flow
的一种实现。并且在Flow
这个模型的基础上,参考了 Java 8 Stream 的接口功能设计,加入了流处理的机制。
Project Reactor - Flux
如何实现Flow
的接口
Flux就是一串相同类型数据的流,他包括并且会发射 0~n 个对象,例如:
Flux<String> just = Flux.just("1", "2", "3");
这样,我们就生成了一个包含三个字符串的Flux流(底层实现实际上就是FluxArray,这个我们以后会说的)
然后,我们按照之前 Flow 里面提到的流程,先进行简单的 subscribe
Flux.just("test1", "test2", "test3")
//打印详细流日志
.log()
//订阅消费
.subscribe(System.out::println);
运行代码,我们会看到日志输出:
07:08:13.816 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
07:08:13.822 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
07:08:13.823 [main] INFO reactor.Flux.Array.1 - | onNext(test1)
test1
07:08:13.823 [main] INFO reactor.Flux.Array.1 - | onNext(test2)
test2
07:08:13.823 [main] INFO reactor.Flux.Array.1 - | onNext(test3)
test3
07:08:13.824 [main] INFO reactor.Flux.Array.1 - | onComplete()
这些日志很清楚的说明了subscribe
究竟是如何工作的:
- 首先在
subscribe
的同时,onSubscribe
首先被调用 - 然后调用
request(unbounded)
,这里request
代表请求多少个数据,unbounded
代表请求无限个,就是所有的数据 - 对于每个数据对象,调用
onNext
方法:onNext(test1),onNext(test2),onNext(test3) - 在最后完成的时候,
onComplete
会被调用,如果说遇到了异常,那么onError
会被调用,就不会调用onComplete
了
这些方法其实都是Subscriber
的方法,Subscriber
是Flux的订阅者,配置订阅者如何消费以及消费的具体操作。
Subscriber<String> subscriber = new Subscriber<String>() {
//在订阅成功的时候,如何操作
@Override
public void onSubscribe(Subscription subscription) {
//取最大数量的元素个数
subscription.request(Long.MAX_VALUE);
}
//对于每个元素的操作
@Override
public void onNext(String o) {
System.out.println(o);
}
//在发生错误的时候
@Override
public void onError(Throwable throwable) {
log.error("error: {}", throwable.getMessage(), throwable);
}
//在完成的时候,发生错误不算完成
@Override
public void onComplete() {
log.info("complete");
}
};
Flux.just("test1", "test2", "test3")
//打印详细流日志
.log()
//订阅消费
.subscribe(subscriber);
运行后,日志是:
07:28:27.227 [main] INFO reactor.Flux.Array.2 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
07:28:27.227 [main] INFO reactor.Flux.Array.2 - | request(unbounded)
07:28:27.228 [main] INFO reactor.Flux.Array.2 - | onNext(test1)
test1
07:28:27.228 [main] INFO reactor.Flux.Array.2 - | onNext(test2)
test2
07:28:27.228 [main] INFO reactor.Flux.Array.2 - | onNext(test3)
test3
07:28:27.228 [main] INFO reactor.Flux.Array.2 - | onComplete()
07:28:27.235 [main] INFO com.test.TestMonoFlux - complete
subscribe还有如下几个api:
//在不需要消费,只需要启动Flux中间处理的话,用这个
subscribe();
//相当于:
new Subscriber() {
@Override
public void onSubscribe(Subscription subscription) {
//取最大数量的元素个数
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
};
//指定消费者消费
subscribe(Consumer<? super T> consumer);
//相当于:
new Subscriber() {
@Override
public void onSubscribe(Subscription subscription) {
//取最大数量的元素个数
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object o) {
consumer.accept(o);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
};
//指定消费者,还有异常处理者
subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer);
//相当于:
new Subscriber() {
@Override
public void onSubscribe(Subscription subscription) {
//取最大数量的元素个数
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object o) {
consumer.accept(o);
}
@Override
public void onError(Throwable throwable) {
errorConsumer.accept(throwable);
}
@Override
public void onComplete() {
}
};
//指定消费者,异常处理着还有完成的时候的要执行的操作
subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer);
//相当于:
new Subscriber() {
@Override
public void onSubscribe(Subscription subscription) {
//取最大数量的元素个数
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object o) {
consumer.accept(o);
}
@Override
public void onError(Throwable throwable) {
errorConsumer.accept(throwable);
}
@Override
public void onComplete() {
completeConsumer.run();
}
};
//指定Subscriber所有需要的元素
subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer);
//相当于:
new Subscriber() {
@Override
public void onSubscribe(Subscription subscription) {
subscriptionConsumer.accept(subscription);
}
@Override
public void onNext(Object o) {
consumer.accept(o);
}
@Override
public void onError(Throwable throwable) {
errorConsumer.accept(throwable);
}
@Override
public void onComplete() {
completeConsumer.run();
}
};
这样,就和之前所说的Flow
的设计对应起来了。
上一篇: 2. CPUID指令的术语和表达
下一篇: MFC加载位图