欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Reactive-MongoDB 异步 Java Driver 解读

程序员文章站 2022-11-08 10:46:09
一、关于 异步驱动 从3.0 版本开始,MongoDB 开始提供异步方式的驱动(Java Async Driver),这为应用提供了一种更高性能的选择。但实质上,使用同步驱动(Java Sync Driver)的项目也不在少数,或许是因为先入为主的原因(同步Driver的文档说明更加的完善),又或者 ......

一、关于 异步驱动

从3.0 版本开始,mongodb 开始提供异步方式的驱动(java async driver),这为应用提供了一种更高性能的选择。
但实质上,使用同步驱动(java sync driver)的项目也不在少数,或许是因为先入为主的原因(同步driver的文档说明更加的完善),又或者是为了兼容旧的 mongodb 版本。
无论如何,由于 reactive 的发展,未来使用异步驱动应该是一个趋势。

在使用 async driver 之前,需要对 reactive 的概念有一些熟悉。

二、理解 reactive (响应式)

响应式(reactive)是一种异步的、面向数据流的开发方式,最早是来自于.net 平台上的 reactive extensions 库,随后被扩展为各种编程语言的实现。
在著名的 reactive manifesto(响应式宣言) 中,对 reactive 定义了四个特征:

  • 及时响应(responsive):系统能及时的响应请求。
  • 有韧性(resilient):系统在出现异常时仍然可以响应,即支持容错。
  • 有弹性(elastic):在不同的负载下,系统可弹性伸缩来保证运行。
  • 消息驱动(message driven):不同组件之间使用异步消息传递来进行交互,并确保松耦合及相互隔离。

在响应式宣言的所定义的这些系统特征中,无一不与响应式的流有若干的关系,于是乎就有了 2013年发起的 响应式流规范(reactive stream specification)。

https://www.reactive-streams.org/

其中,对于响应式流的处理环节又做了如下定义:

  • 具有处理无限数量的元素的能力,即允许流永不结束
  • 按序处理
  • 异步地传递元素
  • 实现非阻塞的负压(back-pressure)

java 平台则是在 jdk 9 版本上发布了对 reactive streams 的支持。

下面介绍响应式流的几个关键接口:

  • publisher
    publisher 是数据的发布者。publisher 接口只有一个方法 subscribe,用于添加数据的订阅者,也就是 subscriber。
  • subscriber
    subscriber 是数据的订阅者。subscriber 接口有4个方法,都是作为不同事件的处理器。在订阅者成功订阅到发布者之后,其 onsubscribe(subscription s) 方法会被调用。
    subscription 表示的是当前的订阅关系。

当订阅成功后,可以使用 subscription 的 request(long n) 方法来请求发布者发布 n 条数据。发布者可能产生3种不同的消息通知,分别对应 subscriber 的另外3个回调方法。

数据通知:对应 onnext 方法,表示发布者产生的数据。
错误通知:对应 onerror 方法,表示发布者产生了错误。
结束通知:对应 oncomplete 方法,表示发布者已经完成了所有数据的发布。
在上述3种通知中,错误通知和结束通知都是终结通知,也就是在终结通知之后,不会再有其他通知产生。

  • subscription
    subscription 表示的是一个订阅关系。除了之前提到的 request 方法之外,还有 cancel 方法用来取消订阅。需要注意的是,在 cancel 方法调用之后,发布者仍然有可能继续发布通知。但订阅最终会被取消。

这几个接口的关系如下图所示:

图片出处:http://wiki.jikexueyuan.com/index.php/project/reactor-2.0/05.html

mongodb 的异步驱动为 mongo-java-driver-reactivestreams 组件,其实现了 reactive stream 的上述接口。

> 除了 reactivestream 之外,mongodb 的异步驱动还包含 rxjava 等风格的版本,有兴趣的读者可以进一步了解

http://mongodb.github.io/mongo-java-driver-reactivestreams/1.11/getting-started/quick-tour-primer/

三、使用示例

接下来,通过一个简单的例子来演示一下 reactive 方式的代码风格:

a. 引入依赖

    org.mongodb
    mongodb-driver-reactivestreams
    1.11.0

> 引入mongodb-driver-reactivestreams 将会自动添加 reactive-streams, bson, mongodb-driver-async组件

b. 连接数据库

//服务器实例表
list servers = new arraylist();
servers.add(new serveraddress("localhost", 27018));

//配置构建器
mongoclientsettings.builder settingsbuilder = mongoclientsettings.builder();

//传入服务器实例
settingsbuilder.applytoclustersettings(
        builder -> builder.hosts(servers));

//构建 client 实例
mongoclient mongoclient = mongoclients.create(settingsbuilder.build());

c. 实现文档查询

//获得数据库对象
mongodatabase database = client.getdatabase(databasename);

//获得集合
mongocollection collection = database.getcollection(collectionname);

//异步返回publisher
findpublisher publisher = collection.find();

//订阅实现
publisher.subscribe(new subscriber() {
    @override
    public void onsubscribe(subscription s) {
        system.out.println("start...");
        //执行请求
        s.request(integer.max_value);

    }
    @override
    public void onnext(document document) {
        //获得文档
        system.out.println("document:" + document.tojson());
    }

    @override
    public void onerror(throwable t) {
        system.out.println("error occurs.");
    }

    @override
    public void oncomplete() {
        system.out.println("finished.");
    }
});

注意到,与使用同步驱动不同的是,collection.find()方法返回的不是 cursor,而是一个 findpublisher对象,这是publisher接口的一层扩展。
而且,在返回 publisher 对象时,此时并没有产生真正的数据库io请求。 真正发起请求需要通过调用 subscription.request()方法。
在上面的代码中,为了读取由 publisher 产生的结果,通过自定义一个subscriber,在onsubscribe 事件触发时就执行 数据库的请求,之后分别对 onnext、onerror、oncomplete进行处理。

尽管这种实现方式是纯异步的,但在使用上比较繁琐。试想如果对于每个数据库操作都要完成一个subscriber 逻辑,那么开发的工作量是巨大的。

为了尽可能复用重复的逻辑,可以对subscriber的逻辑做一层封装,包含如下功能:

  • 使用 list 容器对请求结果进行缓存
  • 实现阻塞等待结果的方法,可指定超时时间
  • 捕获异常,在等待结果时抛出

代码如下:

public class observablesubscriber implements subscriber {

    //响应数据
    private final list received;
    //错误信息
    private final list errors;
    //等待对象
    private final countdownlatch latch;
    //订阅器
    private volatile subscription subscription;
    //是否完成
    private volatile boolean completed;

    public observablesubscriber() {
        this.received = new arraylist();
        this.errors = new arraylist();
        this.latch = new countdownlatch(1);
    }

    @override
    public void onsubscribe(final subscription s) {
        subscription = s;
    }

    @override
    public void onnext(final t t) {
        received.add(t);
    }

    @override
    public void onerror(final throwable t) {
        errors.add(t);
        oncomplete();
    }

    @override
    public void oncomplete() {
        completed = true;
        latch.countdown();
    }

    public subscription getsubscription() {
        return subscription;
    }

    public list getreceived() {
        return received;
    }

    public throwable geterror() {
        if (errors.size() > 0) {
            return errors.get(0);
        }
        return null;
    }

    public boolean iscompleted() {
        return completed;
    }

    /**
     * 阻塞一定时间等待结果
     *
     * @param timeout
     * @param unit
     * @return
     * @throws throwable
     */
    public list get(final long timeout, final timeunit unit) throws throwable {
        return await(timeout, unit).getreceived();
    }

    /**
     * 一直阻塞等待请求完成
     *
     * @return
     * @throws throwable
     */
    public observablesubscriber await() throws throwable {
        return await(long.max_value, timeunit.milliseconds);
    }

    /**
     * 阻塞一定时间等待完成
     *
     * @param timeout
     * @param unit
     * @return
     * @throws throwable
     */
    public observablesubscriber await(final long timeout, final timeunit unit) throws throwable {
        subscription.request(integer.max_value);
        if (!latch.await(timeout, unit)) {
            throw new mongotimeoutexception("publisher oncomplete timed out");
        }
        if (!errors.isempty()) {
            throw errors.get(0);
        }
        return this;
    }
}

借助这个基础的工具类,我们对于文档的异步操作就变得简单多了。
比如对于文档查询的操作可以改造如下:

observablesubscriber subscriber = new observablesubscriber();
collection.find().subscribe(subscriber);

//结果处理
subscriber.get(15, timeunit.seconds).foreach( d -> {
    system.out.println("document:" + d.tojson());
});

当然,这个例子还有可以继续完善,比如使用 list 作为缓存,则要考虑数据量的问题,避免将全部(或超量) 的文档一次性转入内存。

原文地址: