欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Project Reactor 深度解析 - 2. 响应式编程调试,FLow的概念设计以及实现

程序员文章站 2022-07-14 16:28:35
...

响应式编程的首要问题 - 不好调试

我们在分析传统代码的时候,在哪里打了断点,就能看到直观的调用堆栈,来搞清楚,谁调用了这个代码,之前对参数做了什么修改,等等。但是在响应式编程中,这个问题就很麻烦。来看下面的例子。

public class FluxUtil1 {
    public static Flux<Integer> test(Flux<Integer> integerFlux) {
        return FluxUtil2.test2(integerFlux.map(Object::toString));
    }
}
public class FluxUtil2 {
    public static Flux<Integer> test2(Flux<String> stringFlux) {
        return stringFlux.map(Integer::new);
    }
}
public class FluxTest {
    public static void main(String[] args) {
        Flux<Integer> integerFlux = Flux.fromIterable(List.of(1, 2, 3));
        FluxUtil1.test(integerFlux.log()).subscribe(integer -> {
            System.out.println(integer);
        });
    }
}

我们调试到 subscribe 订阅消费(这个后面会讲),我们一般会想知道我们订阅的这个东西,之前经过了怎样的处理,但是在System.out.println(integer)打断点,看到的却是:

Project Reactor 深度解析 - 2. 响应式编程调试,FLow的概念设计以及实现

根本看不出来是FluxUtil1FluxUtil2处理过这个Flux。简单的代码还好,复杂起来调试简直要人命。官方也意识到了这一点,所以提供了一种在操作时捕捉堆栈缓存起来的机制。

这里我们先给出这些机制如何使用,后面我们会分析其中的实现原理。

1. 通过打开全局 Operator 堆栈追踪

设置reactor.trace.operatorStacktrace这个环境变量为 true,即启动参数中加入 -Dreactor.trace.operatorStacktrace=true,这样启动全局 Operator 堆栈追踪。

这个也可以通过代码动态打开或者关闭:

//打开
Hooks.onOperatorDebug();
//关闭
Hooks.resetOnOperatorDebug();

打开这个追踪之后,在每多一个 Operator,就会多出来一个 FluxOnAssembly(这个后面原理会详细说明)。通过这个 FluxOnAssembly,里面就有堆栈信息。怎么获取呢?可以通过Scannable.from(某个Flux).parents().collect(Collectors.toList())获取里面所有层的 Flux,其中包含了 FluxOnAssembly, FluxOnAssembly 就包含了堆栈信息。

我们这里,在System.out.println(integer)打断点,加入查看Scannable.from(FluxUtil1.test(integerFlux.log())).parents().collect(Collectors.toList()),就能看到:

Project Reactor 深度解析 - 2. 响应式编程调试,FLow的概念设计以及实现

可以看出,每次map操作究竟发生在哪一行代码,都能看到。

如果使用的是专业版的 IDEA,还可以配置:
Project Reactor 深度解析 - 2. 响应式编程调试,FLow的概念设计以及实现

然后可以在打断点 Debug 就能看到具体堆栈:
Project Reactor 深度解析 - 2. 响应式编程调试,FLow的概念设计以及实现

2. 通过加入 ReactorDebugAgent 实现

添加依赖:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-tools</artifactId>
    <version>略</version>
</dependency>

之后,可以通过这两个代码,开启

//启用
ReactorDebugAgent.init();
//如果有类没有生效,例如初始化没加载,后来动态加载的类,可以调用这个重新处理启用
ReactorDebugAgent.processExistingClasses();

这样,可以动态修改线上应用开启Debug模式,例如通过 Arthas 这个工具的 ognl 调用静态方法的功能(https://alibaba.github.io/arthas/ognl.html)。

如果使用的是专业版的 IDEA,还可以配置:
Project Reactor 深度解析 - 2. 响应式编程调试,FLow的概念设计以及实现

然后可以在打断点 Debug 就能看到具体堆栈:
Project Reactor 深度解析 - 2. 响应式编程调试,FLow的概念设计以及实现

响应式编程 - Flow 的理解

之前说过 FLow 是 Java 9 中引入的响应式编程的抽象概念,对应的类就是:java.util.concurrent.Flow
Flow 是一个概念类,其中定义了三个接口供实现。这三个接口分别是:Publisher, SubscriberSubscription

//标注是一个FunctionalInterface,因为只有一个抽象方法
@FunctionalInterface
public static interface Publisher<T> {
    public void subscribe(Subscriber<? super T> subscriber);
}
public static interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);
    public void onNext(T item);
    public void onError(Throwable throwable);
    public void onComplete();
}
public static interface Subscription {
    public void request(long n);
    public void cancel();
}

Publisher是负责生成 item 的,其中的subscribe方法就是注册Subscriber进去,用于消费。注册成功后,会调用SubscriberonSubscribe方法,传Subscription进来。这个Subscription里面的 request 用于请求Publisher发送多少 item 过来,cancel 用于告诉Publisher不要再发 item 过来了。每次Publisher有 item 生成并且没有超过Subscription request 的个数限制,onNext方法会被调用用于发送这个 item。当有异常发生时,onError 就会被调用。当Publisher判断不会有新的 item 或者异常发生的时候,就会调用onComplete告诉Subscriber消费完成了。大体上就是这么个流程。

Project Reactor 就是Flow的一种实现。并且在Flow这个模型的基础上,参考了 Java 8 Stream 的接口功能设计,加入了流处理的机制。

Project Reactor - Flux如何实现Flow的接口

Flux就是一串相同类型数据的流,他包括并且会发射 0~n 个对象,例如:

Flux<String> just = Flux.just("1", "2", "3");

这样,我们就生成了一个包含三个字符串的Flux流(底层实现实际上就是FluxArray,这个我们以后会说的)

然后,我们按照之前 Flow 里面提到的流程,先进行简单的 subscribe

Flux.just("test1", "test2", "test3")
    //打印详细流日志
    .log()
    //订阅消费
    .subscribe(System.out::println);

运行代码,我们会看到日志输出:

07:08:13.816 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
07:08:13.822 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
07:08:13.823 [main] INFO reactor.Flux.Array.1 - | onNext(test1)
test1
07:08:13.823 [main] INFO reactor.Flux.Array.1 - | onNext(test2)
test2
07:08:13.823 [main] INFO reactor.Flux.Array.1 - | onNext(test3)
test3
07:08:13.824 [main] INFO reactor.Flux.Array.1 - | onComplete()

这些日志很清楚的说明了subscribe究竟是如何工作的:

  1. 首先在subscribe的同时,onSubscribe首先被调用
  2. 然后调用request(unbounded),这里request代表请求多少个数据,unbounded代表请求无限个,就是所有的数据
  3. 对于每个数据对象,调用onNext方法:onNext(test1),onNext(test2),onNext(test3)
  4. 在最后完成的时候,onComplete会被调用,如果说遇到了异常,那么onError会被调用,就不会调用onComplete
    这些方法其实都是Subscriber的方法,Subscriber是Flux的订阅者,配置订阅者如何消费以及消费的具体操作。
Subscriber<String> subscriber = new Subscriber<String>() {
    //在订阅成功的时候,如何操作
    @Override
    public void onSubscribe(Subscription subscription) {
        //取最大数量的元素个数
        subscription.request(Long.MAX_VALUE);
    }

    //对于每个元素的操作
    @Override
    public void onNext(String o) {
        System.out.println(o);
    }

    //在发生错误的时候
    @Override
    public void onError(Throwable throwable) {
        log.error("error: {}", throwable.getMessage(), throwable);
    }

    //在完成的时候,发生错误不算完成
    @Override
    public void onComplete() {
        log.info("complete");
    }
};

Flux.just("test1", "test2", "test3")
    //打印详细流日志
    .log()
    //订阅消费
    .subscribe(subscriber);

运行后,日志是:

07:28:27.227 [main] INFO reactor.Flux.Array.2 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
07:28:27.227 [main] INFO reactor.Flux.Array.2 - | request(unbounded)
07:28:27.228 [main] INFO reactor.Flux.Array.2 - | onNext(test1)
test1
07:28:27.228 [main] INFO reactor.Flux.Array.2 - | onNext(test2)
test2
07:28:27.228 [main] INFO reactor.Flux.Array.2 - | onNext(test3)
test3
07:28:27.228 [main] INFO reactor.Flux.Array.2 - | onComplete()
07:28:27.235 [main] INFO com.test.TestMonoFlux - complete

subscribe还有如下几个api:

//在不需要消费,只需要启动Flux中间处理的话,用这个
subscribe();
//相当于:
new Subscriber() {
    @Override
    public void onSubscribe(Subscription subscription) {
        //取最大数量的元素个数
        subscription.request(Long.MAX_VALUE);
    }
    @Override
    public void onNext(Object o) {
    }
    @Override
    public void onError(Throwable throwable) {
    }
    @Override
    public void onComplete() {
    }
};


//指定消费者消费
subscribe(Consumer<? super T> consumer); 
//相当于:
new Subscriber() {
    @Override
    public void onSubscribe(Subscription subscription) {
        //取最大数量的元素个数
        subscription.request(Long.MAX_VALUE);
    }
    @Override
    public void onNext(Object o) {
        consumer.accept(o);
    }
    @Override
    public void onError(Throwable throwable) {
    }
    @Override
    public void onComplete() {
    }
};


//指定消费者,还有异常处理者
subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); 
//相当于:
new Subscriber() {
    @Override
    public void onSubscribe(Subscription subscription) {
        //取最大数量的元素个数
        subscription.request(Long.MAX_VALUE);
    }
    @Override
    public void onNext(Object o) {
        consumer.accept(o);
    }
    @Override
    public void onError(Throwable throwable) {
        errorConsumer.accept(throwable);
    }
    @Override
    public void onComplete() {
    }
};


//指定消费者,异常处理着还有完成的时候的要执行的操作
subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer);
//相当于:
new Subscriber() {
    @Override
    public void onSubscribe(Subscription subscription) {
        //取最大数量的元素个数
        subscription.request(Long.MAX_VALUE);
    }
    @Override
    public void onNext(Object o) {
        consumer.accept(o);
    }
    @Override
    public void onError(Throwable throwable) {
        errorConsumer.accept(throwable);
    }
    @Override
    public void onComplete() {
        completeConsumer.run();
    }
};

//指定Subscriber所有需要的元素
subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer); 
//相当于:
new Subscriber() {
    @Override
    public void onSubscribe(Subscription subscription) {
        subscriptionConsumer.accept(subscription);
    }
    @Override
    public void onNext(Object o) {
        consumer.accept(o);
    }
    @Override
    public void onError(Throwable throwable) {
        errorConsumer.accept(throwable);
    }
    @Override
    public void onComplete() {
        completeConsumer.run();
    }
};

这样,就和之前所说的Flow的设计对应起来了。