欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Spring WebFlux之HttpHandler的探索

程序员文章站 2023-10-16 23:07:59
这是本人正在写的《Java 编程方法论:响应式Reactor3、Reactor Netty和Spring WebFlux》一书的文章节选,它是 "《Java编程方法论:响应式RxJava与代码设计实战》" 的续篇,也可作为独立的一本来读 这是此节上半段的节选内容 HttpHandler的探索 通过前 ......

这是本人正在写的《java 编程方法论:响应式reactor3、reactor-netty和spring webflux》一书的文章节选,它是《java编程方法论:响应式rxjava与代码设计实战》的续篇,也可作为独立的一本来读

这是此节上半段的节选内容

httphandler的探索

通过前面的章节,我们已经接触了reactor-netty整个流程的设计实现细节,同时也涉及到了reactor.netty.http.server.httpserver#handle,准确得说,它是一个spi(service provider interface)接口,对外提供bifunction<? super httpserverrequest, ? super httpserverresponse, ? extends publisher<void>> handler,这样,我们可以针对该handler依据自身环境进行相应实现。
spring webfluxreactor-netty都有一套属于自己的实现,只不过前者为了适应spring web的一些习惯做了大量的适配设计,整个过程比较复杂,后者提供了一套简单而灵活的实现。那么本章我们就从reactor-netty内对它的实现开始,正式向spring webflux进行过渡。

httpserverroutes设定

往往我们在给后台服务器提交http请求的时候,往往会涉及到getheadpostput这几种类型,还会包括请求地址,服务端会根据请求类型和请求地址提供对应的服务,然后才是具体的处理,那么我们是不是可以将寻找服务的这个过程抽取出来,形成服务路由查找。

于是,在reactor-netty中,设计了一个httpserverroutes接口,该接口继承了bifunction<httpserverrequest, httpserverresponse, publisher<void>>,用来路由请求,当请求来临时,对我们所设计的路由规则按顺序依次查找,直到第一个匹配,然后调用对应的处理handlerhttpserverroutes接口内针对于我们常用的getheadpostputdelete等请求设计了对应的路由规则(具体请看下面源码)。

我们在使用的时候首先会调用httpserverroutes#newroutes得到一个defaulthttpserverroutes实例,然后加入我们设计的路由规则,关于路由规则的设计,其实就是将一条条规则通过一个集合管理起来,然后在需要时进行遍历匹配即可,这里它的核心组织方法就是reactor.netty.http.server.httpserverroutes#route,在规则设计完后,我们就可以设计对应每一条规则的bifunction<httpserverrequest, httpserverresponse, publisher<void>>函数式实现,最后,当请求路由匹配成功,就可以调用我们的bifunction实现,对请求进行处理。

//reactor.netty.http.server.httpserverroutes
public interface httpserverroutes extends
                                  bifunction<httpserverrequest, httpserverresponse, publisher<void>> {

    static httpserverroutes newroutes() {
        return new defaulthttpserverroutes();
    }


    default httpserverroutes delete(string path,
            bifunction<? super httpserverrequest, ? super httpserverresponse, ? extends publisher<void>> handler) {
        return route(httppredicate.delete(path), handler);
    }

    ...

    default httpserverroutes get(string path,
            bifunction<? super httpserverrequest, ? super httpserverresponse, ? extends publisher<void>> handler) {
        return route(httppredicate.get(path), handler);
    }

    default httpserverroutes head(string path,
            bifunction<? super httpserverrequest, ? super httpserverresponse, ? extends publisher<void>> handler) {
        return route(httppredicate.head(path), handler);
    }

    default httpserverroutes index(final bifunction<? super httpserverrequest, ? super httpserverresponse, ? extends publisher<void>> handler) {
        return route(index_predicate, handler);
    }

    default httpserverroutes options(string path,
            bifunction<? super httpserverrequest, ? super httpserverresponse, ? extends publisher<void>> handler) {
        return route(httppredicate.options(path), handler);
    }

    default httpserverroutes post(string path,
            bifunction<? super httpserverrequest, ? super httpserverresponse, ? extends publisher<void>> handler) {
        return route(httppredicate.post(path), handler);
    }

    default httpserverroutes put(string path,
            bifunction<? super httpserverrequest, ? super httpserverresponse, ? extends publisher<void>> handler) {
        return route(httppredicate.put(path), handler);
    }

    httpserverroutes route(predicate<? super httpserverrequest> condition,
            bifunction<? super httpserverrequest, ? super httpserverresponse, ? extends publisher<void>> handler);

    ...

}

关于路由规则的设计,结合前面所讲,我们可以在httpserverroutes的实现类中设计一个list用来存储一条条的规则,接下来要做的就是将制定的规则一条条放入其中即可,因为这是一个添加过程,并不需要返回值,我们可以使用consumer<? super httpserverroutes>来代表这个过程。对于请求的匹配,往往都是对请求的条件判断,那我们可以使用predicate<? super httpserverrequest>来代表这个判断逻辑,由于单条路由规则匹配对应的bifunction<httpserverrequest, httpserverresponse, publisher<void>>处理,那么我们是不是可以将这两者耦合到一起,于是reactor.netty.http.server.defaulthttpserverroutes.httproutehandler就设计出来了:

//reactor.netty.http.server.defaulthttpserverroutes.httproutehandler
static final class httproutehandler
        implements bifunction<httpserverrequest, httpserverresponse, publisher<void>>,
                    predicate<httpserverrequest> {

    final predicate<? super httpserverrequest>          condition;
    final bifunction<? super httpserverrequest, ? super httpserverresponse, ? extends publisher<void>>
                                                        handler;
    final function<? super string, map<string, string>> resolver;

    httproutehandler(predicate<? super httpserverrequest> condition,
            bifunction<? super httpserverrequest, ? super httpserverresponse, ? extends publisher<void>> handler,
            @nullable function<? super string, map<string, string>> resolver) {
        this.condition = objects.requirenonnull(condition, "condition");
        this.handler = objects.requirenonnull(handler, "handler");
        this.resolver = resolver;
    }

    @override
    public publisher<void> apply(httpserverrequest request,
            httpserverresponse response) {
        return handler.apply(request.paramsresolver(resolver), response);
    }

    @override
    public boolean test(httpserverrequest o) {
        return condition.test(o);
    }
}

这里可能需要对request中的参数进行解析,所以对外提供了一个可供我们自定义的参数解析器实现接口:function<? super string, map<string, string>>,剩下的conditionresolver就可以按照我们前面说的逻辑进行。

此时,httproutehandler属于一个真正的请求校验者和请求业务处理者,我们现在要将它们的功能通过一系列逻辑串联形成一个处理流程,那么这里可以通过一个代理模式进行,我们在httpserverroutes的实现类中通过一个list集合管理了数量不等的httproutehandler实例,对外,我们在使用reactor.netty.http.server.httpserver#handle时只会看到一个bifunction<httpserverrequest, httpserverresponse, publisher<void>>实现,那么,所有的逻辑流程处理都应该在这个bifunctionapply(...)实现中进行,于是,我们就有下面的reactor.netty.http.server.defaulthttpserverroutes实现:

//reactor.netty.http.server.defaulthttpserverroutes
final class defaulthttpserverroutes implements httpserverroutes {


    private final copyonwritearraylist<httproutehandler> handlers =
            new copyonwritearraylist<>();
    ...
    @override
    public httpserverroutes route(predicate<? super httpserverrequest> condition,
            bifunction<? super httpserverrequest, ? super httpserverresponse, ? extends publisher<void>> handler) {
        objects.requirenonnull(condition, "condition");
        objects.requirenonnull(handler, "handler");

        if (condition instanceof httppredicate) {
            handlers.add(new httproutehandler(condition,
                    handler,
                    (httppredicate) condition));
        }
        else {
            handlers.add(new httproutehandler(condition, handler, null));
        }
        return this;
    }

    @override
    public publisher<void> apply(httpserverrequest request, httpserverresponse response) {
        final iterator<httproutehandler> iterator = handlers.iterator();
        httproutehandler cursor;

        try {
            while (iterator.hasnext()) {
                cursor = iterator.next();
                if (cursor.test(request)) {
                    return cursor.apply(request, response);
                }
            }
        }
        catch (throwable t) {
            exceptions.throwifjvmfatal(t);
            return mono.error(t); //500
        }

        return response.sendnotfound();
    }
    ...
}

可以看到route(...)方法只是做了httproutehandler实例的构建并交由handlers这个list进行管理,通过上面的apply实现将前面的内容在流程逻辑中进行组合。于是,我们就可以在reactor.netty.http.server.httpserver中设计一个route方法,对外提供一个spi接口,将我们所提到的整个过程定义在这个方法中(得到一个httpserverroutes实例,然后通过它的route方法构建规则,构建过程在前面提到的consumer<? super httpserverroutes>中进行,最后将组合成功的httpserverroutesbifunction<httpserverrequest, httpserverresponse, publisher<void>>的角色作为参数交由httpserver#handle)。

另外,我们在这里要特别注意下,在上面defaulthttpserverroutes实现的apply方法中,可以看出,一旦请求匹配,处理完后就直接返回结果,不再继续遍历匹配,也就是说每次新来的请求,只调用所声明匹配规则顺序的第一个匹配。

//reactor.netty.http.server.httpserver#route
public final httpserver route(consumer<? super httpserverroutes> routesbuilder) {
    objects.requirenonnull(routesbuilder, "routebuilder");
    httpserverroutes routes = httpserverroutes.newroutes();
    routesbuilder.accept(routes);
    return handle(routes);
}

于是,我们就可以通过下面的demo来应用上面的设计:

import reactor.core.publisher.mono;
import reactor.netty.disposableserver;
import reactor.netty.http.server.httpserver;

public class application {

    public static void main(string[] args) {
        disposableserver server =
                httpserver.create()
                          .route(routes ->
                              routes.get("/hello",        <1>
                                         (request, response) -> response.sendstring(mono.just("hello world!")))
                                    .post("/echo",        <2>
                                         (request, response) -> response.send(request.receive().retain()))
                                    .get("/path/{param}", <3>
                                         (request, response) -> response.sendstring(mono.just(request.param("param")))))
                          .bindnow();

        server.ondispose()
              .block();
    }
}

<1>处,当我们发出一个get请求去访问/hello时就会得到一个字符串hello world!

<2>处,当我们发出一个 post请求去访问 /echo时就会将请求体作为响应内容返回。

<3>处,当我们发出一个 get请求去访问 /path/{param}
时就会得到一个请求路径参数param的值。

关于sse在这里的使用,我们可以看下面这个demo,具体的代码细节就不详述了,看对应注释即可:

import com.fasterxml.jackson.databind.objectmapper;
import io.netty.buffer.bytebuf;
import io.netty.buffer.bytebufallocator;
import org.reactivestreams.publisher;
import reactor.core.publisher.flux;
import reactor.netty.disposableserver;
import reactor.netty.http.server.httpserver;
import reactor.netty.http.server.httpserverrequest;
import reactor.netty.http.server.httpserverresponse;

import java.io.bytearrayoutputstream;
import java.nio.charset.charset;
import java.time.duration;
import java.util.function.bifunction;

public class application {

    public static void main(string[] args) {
        disposableserver server =
                httpserver.create()
                          .route(routes -> routes.get("/sse", servesse()))
                          .bindnow();

        server.ondispose()
              .block();
    }

    /**
     * 准备 sse response
     * 参考 reactor.netty.http.server.httpserverresponse#sse可以知道它的"content-type" 
     * 是"text/event-stream"
     * flush策略为通过所提供的publisher来每下发一个元素就flush一次
     */
    private static bifunction<httpserverrequest, httpserverresponse, publisher<void>> servesse() {
        flux<long> flux = flux.interval(duration.ofseconds(10));
        return (request, response) ->
            response.sse()
                    .send(flux.map(application::tobytebuf), b -> true);
    }

    /**
     * 将发元素按照按照给定的格式由object转换为bytebuf。
     */
    private static bytebuf tobytebuf(object any) {
        bytearrayoutputstream out = new bytearrayoutputstream();
        try {
            out.write("data: ".getbytes(charset.defaultcharset()));
            mapper.writevalue(out, any);
            out.write("\n\n".getbytes(charset.defaultcharset()));
        }
        catch (exception e) {
            throw new runtimeexception(e);
        }
        return bytebufallocator.default
                               .buffer()
                               .writebytes(out.tobytearray());
    }

    private static final objectmapper mapper = new objectmapper();
}