欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

Vertx快速入门参考

程序员文章站 2022-03-10 18:36:26
Vertx学习什么是vertx?Vert.x最大的特点就在于异步(底层基于Netty),通过事件循环(EventLoop)来调起存储在异步任务队列(CallBackQueue)中的任务,大大降低了传统阻塞模型中线程对于操作系统的开销。因此相比较传统的阻塞模型,异步模型能够很大层度的提高系统的并发量。核心点1:异步模型框架Vert.x除了异步之外,还提供了非常多的吸引人的技术,比如EventBus,通过EventBus可以非常简单的实现分布式消息,进而为分布式系统调用,微服务奠定基础。除此之外,还...

Vertx学习

  1. 什么是vertx?

Vert.x最大的特点就在于异步(底层基于Netty),通过事件循环(EventLoop)来调起存储在异步任务队列(CallBackQueue)中的任务,大大降低了传统阻塞模型中线程对于操作系统的开销。因此相比较传统的阻塞模型,异步模型能够很大层度的提高系统的并发量。

核心点1:异步模型框架

Vert.x除了异步之外,还提供了非常多的吸引人的技术,比如EventBus,通过EventBus可以非常简单的实现分布式消息,进而为分布式系统调用,微服务奠定基础。除此之外,还提供了对多种客户端的支持,比如Redis,RabbitMQ,Kafka等等。

核心点2:良好的扩展兼容性

Vert.x异步也带来了编码上的复杂性,想要编写优美的异步代码,就需要对lambda表达式、函数式编程、Reactive等技术非常熟悉才行,否则很容易导致你的代码一团糟,完全没有可读性。(Java 8以上)另外,异步模型的性能调优、异常处理与同步模型有很大差异,网络中相关资料较少,使用中遇到问题排查困难,这也是目前国内架构师不愿意选择Vert.x的原因。(挑战)

Vert.x运行在Java虚拟机上,支持多种编程语言,Vert.x是高度模块化的,同一个应用,你可以选择多种编程语言同时开发。在Vert.x 2版本,也就是基于JDK7,还没有lambda的时候,一般来讲,使用JavaScript作为开发语言相对较多,到Vert.x3的时代,因为JDK8的出现,Java已经作为Vert.x主流的开发语言,而Vert.x也被更多的开发者所接受。

Java能做的,Vert.x都能做。主要讨论,Vert.x善于做哪些事情!

(1)Web开发,Vert.x封装了Web开发常用的组件,支持路由、Session管理、模板等,可以非常方便的进行Web开发。不需要容器!不需要容器!不需要容器!

(2)TCP/UDP开发,Vert.x底层基于Netty,提供了丰富的IO类库,支持多种网络应用开发。不需要处理底层细节(如拆包和粘包),注重业务代码编写。(这里可以疯狂吐槽了,因为注重业务代码,也就逼着我们去看源码,哎)

(3)提供对WebSocket的支持,可以做网络聊天室,动态推送等。(这说明了用途确实多啊)

(4)Event Bus(事件总线)是Vert.x的神经系统,通过Event Bus可以实现分布式消息,远程方法调用等等。正是因为Event Bus的存在,Vert.x可以非常便捷的开发微服务应用。

(5)支持主流的数据和消息的访问 redis mongodb rabbitmq kafka 等

(6)分布式锁,分布式计数器,分布式map的支持。(这得好好看看,到底怎么实现的?)

(7)Vert.x底层通信框架依赖于Netty,并封装了对Http协议的支持,因此可以非常方便的进行Web开发,且不依赖于任何中间件。笔者所在的公司老系统使用的是SSM架构的项目,部署在Weblogic上,每年花在中间件上的钱就非常多,现在全面改造为Vert.x,中间件的费用直接就省了。另外不依赖中间件,编程会变得非常灵活,定制性非常强,安全性也会得到一定层度的提高。

(8)为微服务而生

Vert .x提供了各种组件来构建基于微服务的应用程序。通过EventBus可以非常容易的进行服务之间的交互。并且提供了HAZELCAST来实现分布式。

当然了,除了一些优势以外,要在项目中选择使用Vert.x还要考虑一些问题,这里不展开说明,只是根据个人的使用经验提出一些点。

* Vert.x使用JDK8的Lambda,所以要使用Vert.x首先需要对JDK8比较熟悉。当然,对于一个开发者,现在JDK已经出到JDK11了,JDK8的特性也理应该学习一下

* 对于复杂的业务,可能会遇到Callback Hell问题(解决方案也有很多)

* 由于异步的特征、契约创建、更高级的错误处理机制使得开发过程会相对更复杂。(来自并发编程网)

对于实现一个简单的web服务,有很多种选择,简单划为三种

  1. 这是使用最多的一种,也是很多的Java开发者可能最先想到的,就是使用Java中间件来实现,只要下载一个Tomcat,再编写个web项目就可以对外提供Web服务。这种方式我们完全不需要考虑线程的交互,不需要考虑网络协议,只需要关注业务逻辑,可以说是一种全包的形式。
  2. 使用Java原生的Socket或者NIO来实现,但这种方式比较复杂,对编程功底要求最高,而且自己实现很难保证性能和稳定性。这种方式完全需要手动处理,很少会使用这种方式来实现HTTPServer,可以说这是最为原始形式。
  3. 介于全包和原始形式之间,就是用第三方的框架来实现,比如Vertx或者偏底层的Netty。你可以利用框架封装的API简单的创建一个HttpServer,并不需要关注底层的通信协议。这种方式更为灵活,一般来讲性能也更高,并且不依赖中间件。

下面简单的来实现一个HttpServer,通过浏览器访问这个HttpServer能够在浏览器上显示HelloWorld。下面简单列出实现的步骤:

  1. 创建一个Maven项目,并配置依赖的包。(这里仅仅需要引入vertx-core的包即可)
  2. 创建一个核心类,创建main方法,直接在main方法中编写代码即可(后期会进行改造)
  3. 直接运行核心类,并通过浏览器访问

代码model:

import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerResponse;

public class MyHttpServer   {

    public static void main(String[] args){

//        获取vertx实例,创建httpserver
        // java 8新特性,接口中定义静态方法
        Vertx vertx = Vertx.vertx();
        HttpServer server = vertx.createHttpServer();
        server.requestHandler(httpServerRequest -> {
            HttpServerResponse response = httpServerRequest.response();

//            设置响应头
            response.putHeader("content-type","text/html;charset=utf-8");
//            设置响应数据
            response.end("hello world!");
        });
        server.listen(8888);
    }
}

在上面演示的创建HttpServer的方式我们会发现有一个很大的问题,就是所有的代码都写在main方法中,这样显然是不好的。Vert.x推荐的写法是为每一个应用创建一个Verticle,也就是Vert.x的模块,然后通过Vertx核心接口,部署Verticle模块。多个Verticle模块之间可以通过EventBus进行相互调用。

这里提到了Verticle、EventBus等,这些都是Vert.x中一些非常重要的概念。Verticle可以简单的理解为继承了AbstractVerticle的类都是一个Verticle,每个Verticle可以单独部署或者单独运行。EventBus看不见,摸不着,可以简单的理解为,各Verticle模块之间通信的桥梁。

下面我们就创建一个HttpServer的Verticle模块,并部署到Vertx中运行,实现步骤如下:

  1. 创建一个类,继承AbstractVerticle类
  2. 重写start方法和stop方法,在start方法中处理业务逻辑,stop方法中释放资源
  3. 在main方法中部署Verticle模块
  4. 启动服务,并通过浏览器进行访问

Vertx和spring的比较:

Vert.x可以开发Web应用,但Vert.x不仅仅是一个Web开发框架,他更像Spring,是一个技术栈(Vert.x生态可以查看https://github.com/vert-x3/vertx-awesome),或者说是一个Vert.x生态体系。在这个体系中,Vert.x只是提供了Web开发的能力。下面对Vertx和Spring做一个对比:

项目 Spring Vertx
核心框架 spring-core vertx-core
Web开发 spring-webmvc vertx-web
jdbc框架 spring-jdbc vertx-jdbc-client
redis spring-data-redis vertx-redis-client
微服务 spring-cloud vertx-hazelcast

可以说,很多的spring能做的事情,Vertx也都能实现。那么既然如此,Spring如此强大,社区如此活跃,为何还会有Vertx呢?他们之前区别的核心点就只有一个,Spring的操作是同步的,Vertx的操作是异步的。异步带来了更高的性能,但同时也带来了编码和调试的复杂度,但不得不说异步可能是未来的一个趋势,至少在Java实现高性能服务器上的一个趋势。

在Java领域,做Web开发我们一般有很多的选择,比如使用原生的Servlet,比如使用SpringMVC,再比如使用Struts等等总之你有很多的选择。在国内,目前来讲,SpringMVC作为Spring体系下的Web层架构,是深受企业青睐的,绝大部分的企业可能都在使用SpringMVC。而对于我们今天要说的Vert.x这个Web层框架,却很少有人知道,但它却是仅次于SpringMVC,排名第二的一个Web层框架。

Vert.x技术体系

上面也提到了,Vert.x和Spring一样,也有着完善的生态,具体可以查看https://github.com/vert-x3/vertx-awesome 我们可以看到,每一块内容都提供了多种的实现,有官方支持的版本还有社区版本。下面我们具体介绍下技术体系中官方支持的版本。

(1)核心模块

Vert.x核心模块包含一些基础的功能,如HTTP,TCP,文件系统访问,EventBus、WebSocket、延时与重复执行、缓存等其他基础的功能,你可以在你自己的应用程序中直接使用。可以通过vertx-core模块引用即可。vertx接口整体如下:
Vertx快速入门参考
一个重要的实现类为VertxImpl,VertxImpl的实现结构如下:
Vertx快速入门参考
首先了解下数据总线,有点计算机组原理中的总线一样,最大的缺点也是类似,但也为某些问题的解决提供了思路,如不同的隔离的线程之间的事件互相使用,则可通过数据总线,但是对于高并发的场景,数据总线要慎重使用:

  1. EventBus用于进行消息传输;EventLoopGroup为事件循环组,是Netty库中的类,每当有新的任务都会被提交到该组中执行;

  2. 而另一个EventLoopGroup——acceptorEventLoopGroup专用于网络服务的创建,目的是避免上面的eventLoopGroup的阻塞造成服务响应不及时;

  3. WorkerPool为单独开的线程池,负责执行阻塞操作;

  4. FileSystem用于操作文件;

  5. AddressResolver用于进行DNS地址解析;

  6. SharedData用于在整个Vertx应用内部共享数据,包括集群模式;

  7. ClusterManager用于进行集群管理;

  8. DeploymentManager和VerticleManager用于发布Verticle,保证Verticle的特性。

所有上述类你可能都不是很熟悉,没关系,先有个印象,下面分析具体场景时会用到。

由于是事件驱动的高并发框架,所以先了解下框架对于事件是如何处理的?那就需要了解下EventBus。
Vertx快速入门参考

  1. 出入拦截器自不必说,每次消息进来和出去都会先被拦截器处理;

  2. vertx对象,主要用于获取发送调用代码所处的上线文环境;

  3. handerMap是核心,以地址为key,地址上注册的Handler序列为value,存储了地址-处理器的映射管理;当触发发送动作时,就会到该映射中查找对应的处理器然后执行;对于单机应用,handlerMap就是所有;对于集群应用,则是先找到节点,再在节点中的handlerMap查找对应处理器。

  4. sendNoContext是为了在执行发送的代码块不处于任何上下文时使用的上下文。EventBusImpl创建时使用。

EventLoop

Vertx中并没有EventLoop这个类,它是Netty中的类。对Vertx的源码,与EventLoop相关的交互只有两处:创建EventLoopGroup;向EventLoopGroup提交任务。

Context

Context是真正提交任务的地方,凡Vertx中涉及到任务的执行,总是少不了Context的身影。

其核心能力主要在协调代码的运行,同时也可存储数据。其大部分逻辑都在ContextImpl中。其两个子类,仅在自我裁定、任务提交、上下文复制上有所不同。
Vertx快速入门参考

Verticle

Verticle放在这里有一点另类,因为它并非核心组件。只是Vertx提供的actor模式实现的一个发布单元。它的actor特性由VerticleManager、EventBus、Context等一起保证。就其能力来说,也只有启动和停止两个方法。

Vertx快速入门参考

(2)Web模块

Vert.x Web是一个工具集,虽然核心模块提供了HTTP的支持,但是要开发复杂的Web应用,还需要路由、Session、请求数据读取、Rest支持等等还需要Web模块,这里提供了上述的这些功能的API,便于开发。

除了对Web服务的开发以外,还提供了对Web客户端请求的支持,通过vertx-web-client即可方便的访问HTTP服务。有朋友可能会有疑惑,我明明可以使用JDK提供的URL来请求HTTP服务啊。

使用Vert.x一定要注意,Vert.x是一个异步框架,请求HTTP服务是一个耗时操作,所有的耗时,都会阻塞EventBus,导致整体性能被拖垮,因此,对于请求Web服务,一定要使用Vert.x提供的vertx-web-client模块

(3)数据访问模块

Vert.x提供了对关系型数据库、NoSQL、消息中间件的支持,传统的客户端因为是阻塞的,会严重影响系统的性能,因此Vert.x提供了对以上客户端的异步支持。具体支持的数据访问如下:

MongoDB client,JDBC client,SQL common,Redis client,MySQL/PostgreSQLclient

(4)Reactive响应式编程

复杂的异步操作,会导致异步回调地狱的产生,看下面的代码,这是我在Vert.x提供的例子中找到的,我们不去管这段代码干了啥,只是看后面的}就很惊讶了,如果操作更为复杂一些,会嵌套的层次更多,通过reactive可以最小化的简化异步回调地狱。

// create a test table
execute(conn.result(), "create table test(id int primary key, name varchar(255))", create -> {
  // start a transaction
  startTx(conn.result(), beginTrans -> {
    // insert some test data
    execute(conn.result(), "insert into test values(1, 'Hello')", insert -> {
      // commit data
      rollbackTx(conn.result(), rollbackTrans -> {
        // query some data
        query(conn.result(), "select count(*) from test", rs -> {
          for (JsonArray line : rs.getResults()) {
            System.out.println(line.encode());
          }
          // and close the connection
          conn.result().close(done -> {
            if (done.failed()) {
              throw new RuntimeException(done.cause());
            }
          });
        });
      });
    });
  });
});

再看一个使用Reactive2构建的多步操作的代码,paramCheckStep,insertPayDtlStep,requestStep等等都是异步方法,但这里就很好的处理了异步回调的问题,不再有那么多层的大括号,代码结构也geng

public void scanPay(JsonObject data, Handler<AsyncResult<JsonObject>> resultHandler) {
    paramCheckStep(data) // 参数校验
            .flatMap(this::insertPayDtlStep) // 插入流水
            .flatMap(x -> requestStep(x, config)) // 请求上游
            .flatMap(this::cleanStep) //参数清理
            .subscribe(ok -> {
                        logger.info("成功结束");
                        resultHandler.handle(Future.succeededFuture(ok));
                    },
                    err -> {
                        logger.error("正在结束", err);
                        resultHandler.handle(Future.failedFuture(err));
                    }
            );
}

(5)整合其他模块

邮件客户端

Vert.x提供了一简单STMP邮件客户端,所以你可以在应用程序中发送电子邮件。

STOMP客户端与服务端

Vert.x提供了STOMP协议的实现包括客户端与服务端。

Consul Client

consul是google开源的一个使用go语言开发的服务发现、配置管理中心服务。内置了服务注册与发现框 架、分布一致性协议实现、健康检查、Key/Value存储、多数据中心方案。

RabbitMQ Client Kafka Client

消息队里的客户端支持

JCA适配器

Vert.x提供了Java连接器架构适配器,这允许同任意JavaEE应用服务器进行互操作。

(6)认证与授权

Vert.x提供了简单API用于在应用中提供认证和授权。

Auth common 通用的认证API,可以通过重写AuthProvider类来实现自己的认证

JDBC auth 后台为JDBC的认证实现

JWT auth 用JSON Web tokens认证实现

Shiro auth 使用Apache Shiro认证实现

MongoDB auth MongoDB认证实现

OAuth2 Oauth2协义认证实现

htdigest auth 这个是新增一种认证的支持

(7)微服务

Vert.x提供多个组件构建基于微服务的应用程序。

比如服务发现(Vert.x Service Discovery)、断路器(Vert.x Circuit Breaker)、配置中心(Vert.x Config)等。

Vert.x简明介绍就说到这里,最后,技术是为业务服务的,在选择架构的时候,也要考虑用人的成本,也正是因为如此,国内使用Vert.x的企业还不是很多。但是我相信,未来,一定是异步非阻塞的天下。

Vert.x(vertx) Web开发-路由

Vert.x提供了Web开发组件vertx-web,提供了一堆Web开发中常用的功能。比如参数封装,路由,国际化,认证和授权,session和cookie以及模板等,可以非常方便的进行Vert.x Web开发。主要介绍Web开发中的路由功能,路由简单说就是把用户请求交给合适的处理器处理的组件
Vertx快速入门参考
路由是Web开发中最基础也是最常用的功能,Vert.x提供了强大的路由功能,包括正则匹配,二级路由等。本文从两个方面来讲述路由,分别是路由的使用和路由的实现原理

简单案例:

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.ext.web.Router;

public class SimpleRouter extends AbstractVerticle {
    @Override
    public void start() throws Exception {
        HttpServer server = vertx.createHttpServer();

        // 设置路由对象
        Router router = Router.router(vertx);

        router.route("/index").handler(request -> {
            request.response().end("SUCCESS !");
        });
        
        // 方法限制
		// router.post("/post").handler(request -> {
		// 	request.response().end("post");
		// });
        //
		// router.route(HttpMethod.GET, "/method").handler(request -> {
		// 	request.response().end("method");
		// });
        
        
        // 核心点,将这个路由策略交给路由对象
        server.requestHandler(router::accept);
        server.listen(8888);
        System.out.println("start success ! ");
    }

    public static void main(String[] args){
        Vertx.vertx().deployVerticle(new SimpleRouter());
    }
}

关键步骤就是将使用router限制请求路径,本质上是处理资源的访问路径。

从EventBus看Vertx工作原理

一个简单的Vertx应用如下,我们从它开始分析。

fun main() {
  val vertx = Vertx.vertx();
  vertx.eventBus().consumer<String>("helloAddress").handler{
    print(it.body())
  }
  vertx.eventBus().send("helloAddress", "hello world!")
}
1234567

Vertx.vertx()在上面已经看过了,它创建了一个VertxImpl对象,持有一堆用于组织工作的属性,包括EventBus。

// vertx实例时对eventBus赋值的快照
this.eventBus = new EventBusImpl(this);
12

consumer做了什么

@Override
public <T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler) {
    Objects.requireNonNull(handler, "handler");
    MessageConsumer<T> consumer = consumer(address);
    consumer.handler(handler);
    return consumer;
}
// 往里进一步
@Override
public <T> MessageConsumer<T> consumer(String address) {
    checkStarted();
    Objects.requireNonNull(address, "address");
    return new HandlerRegistration<>(vertx, metrics, this, address, null, false, null, -1);
}
// 重点在HandlerRegistration,收集地址后,开启超时回复定时器。
public HandlerRegistration(Vertx vertx, EventBusMetrics metrics, EventBusImpl eventBus, String address,
                               String repliedAddress, boolean localOnly,
                               Handler<AsyncResult<Message<T>>> asyncResultHandler, long timeout) {
    this.vertx = vertx;
    this.metrics = metrics;
    this.eventBus = eventBus;
    this.address = address;
    this.repliedAddress = repliedAddress;
    this.localOnly = localOnly;
    this.asyncResultHandler = asyncResultHandler;
    if (timeout != -1) {
        timeoutID = vertx.setTimer(timeout, tid -> {
            if (metrics != null) {
                metrics.replyFailure(address, ReplyFailure.TIMEOUT);
            }
            sendAsyncResultFailure(new ReplyException(ReplyFailure.TIMEOUT, "Timed out after waiting " + timeout + "(ms) for a reply. address: " + address + ", repliedAddress: " + repliedAddress));
        });
    }
}
// 最上面的consumer.handler(handler);调用了HandlerRegistration的handler方法,如下。可以看到最终是在eventBus上调用了注册方法。
@Override
public synchronized MessageConsumer<T> handler(Handler<Message<T>> h) {
    if (h != null) {
        synchronized (this) {
            handler = h;
            if (registered == null) {
                registered = eventBus.addRegistration(address, this, repliedAddress != null, localOnly);
            }
        }
        return this;
    }
    this.unregister();
    return this;
}
// 最终来到了EventBus的addRegistration方法。在addLocalRegistration中,创建了HandlerHolder,并将其加入EventBus的成员变量handlerMap,然后返回创建的HandlerHolder
protected <T> HandlerHolder<T> addRegistration(String address, HandlerRegistration<T> registration,
                                               boolean replyHandler, boolean localOnly) {
    Objects.requireNonNull(registration.getHandler(), "handler");
    LocalRegistrationResult<T> result = addLocalRegistration(address, registration, replyHandler, localOnly);
    addRegistration(result.newAddress, address, replyHandler, localOnly, registration::setResult);
    return result.holder;
}

要点总结

  • consumer方法仅仅将给定的handler注册到EventBusImpl持有的handlerMap中,等待被消费。

send做了什么

//通过跟踪,最终会来到sendOrPubInternal,首先创建一个用于回复的HandlerRegistration,然后创建OutboundDeliveryContext,调用其next方法
public <T> void sendOrPubInternal(MessageImpl message, DeliveryOptions options,
                                  Handler<AsyncResult<Message<T>>> replyHandler) {
    checkStarted();
    HandlerRegistration<T> replyHandlerRegistration = createReplyHandlerRegistration(message, options, replyHandler);
    OutboundDeliveryContext<T> sendContext = new OutboundDeliveryContext<>(message, options, replyHandlerRegistration);
    sendContext.next();
}
// createReplyHandlerRegistration方法创建了__vertx.reply.xxx地址的响应HandlerRegistration
private <T> HandlerRegistration<T> createReplyHandlerRegistration(MessageImpl message,
                                                                  DeliveryOptions options,
                                                                  Handler<AsyncResult<Message<T>>> replyHandler) {
    if (replyHandler != null) {
        long timeout = options.getSendTimeout();
        String replyAddress = generateReplyAddress();
        message.setReplyAddress(replyAddress);
        Handler<Message<T>> simpleReplyHandler = convertHandler(replyHandler);
        HandlerRegistration<T> registration =
            new HandlerRegistration<>(vertx, metrics, this, replyAddress, message.address, true, replyHandler, timeout);
        registration.handler(simpleReplyHandler);
        return registration;
    } else {
        return null;
    }
}
protected String generateReplyAddress() {
    return "__vertx.reply." + Long.toString(replySequence.incrementAndGet());
}
// OutboundDeliveryContext类接收了消息和响应HandlerRegistration,调用next,如下。其中的iter多半是拦截器,暂时不用管。核心在sendOrPub(this)和sendReply(this, replierMessage)
@Override
public void next() {
    if (iter.hasNext()) {
        Handler<DeliveryContext> handler = iter.next();
        try {
            if (handler != null) {
                handler.handle(this);
            } else {
                next();
            }
        } catch (Throwable t) {
            log.error("Failure in interceptor", t);
        }
    } else {
        if (replierMessage == null) {
            sendOrPub(this);
        } else {
            sendReply(this, replierMessage);
        }
    }
}
// 定义io.vertx.core.eventbus.impl.EventBusImpl#sendOrPub,再定位到io.vertx.core.eventbus.impl.EventBusImpl#deliverMessageLocally,最终来到io.vertx.core.eventbus.impl.EventBusImpl#deliverMessageLocally
// 这里的关键由两个地方:一是点对点的实现——再handlerMap中找到指定地址的handlers,只取第一个进行处理;还有发布订阅的实现——对在一个地址注册的handlers全部处理;第二个关键点是消息发送的方法deliverToHandler(msg, holder)
protected ReplyException deliverMessageLocally(MessageImpl msg) {
    msg.setBus(this);
    ConcurrentCyclicSequence<HandlerHolder> handlers = handlerMap.get(msg.address());
    if (handlers != null) {
        if (msg.isSend()) {
            //Choose one
            HandlerHolder holder = handlers.next();
            if (metrics != null) {
                metrics.messageReceived(msg.address(), !msg.isSend(), isMessageLocal(msg), holder != null ? 1 : 0);
            }
            if (holder != null) {
                deliverToHandler(msg, holder);
                Handler<AsyncResult<Void>> handler = msg.writeHandler;
                if (handler != null) {
                    handler.handle(Future.succeededFuture());
                }
            }
        } else {
            // Publish
            if (metrics != null) {
                metrics.messageReceived(msg.address(), !msg.isSend(), isMessageLocal(msg), handlers.size());
            }
            for (HandlerHolder holder: handlers) {
                deliverToHandler(msg, holder);
            }
            Handler<AsyncResult<Void>> handler = msg.writeHandler;
            if (handler != null) {
                handler.handle(Future.succeededFuture());
            }
        }
        return null;
    } else {
        ... ...
    }
}
// 最终的处理函数如下:创建InboundDeliveryContext,在HandlerHolder的context环境下运行其next方法:
private <T> void deliverToHandler(MessageImpl msg, HandlerHolder<T> holder) {
    // Each handler gets a fresh copy
    MessageImpl copied = msg.copyBeforeReceive();
    DeliveryContext<T> receiveContext = new InboundDeliveryContext<>(copied, holder);

    if (metrics != null) {
        metrics.scheduleMessage(holder.getHandler().getMetric(), msg.isLocal());
    }

    holder.getContext().runOnContext((v) -> {
        try {
            receiveContext.next();
        } finally {
            if (holder.isReplyHandler()) {
                holder.getHandler().unregister();
            }
        }
    });
}
// next方法啥也没干,直接将message传入目标handler
@Override
public void next() {
    if (iter.hasNext()) {
        // ... 拦截器迭代,忽略
    } else {
        holder.getHandler().handle(message);
    }
}

要点总结

  • send分为两步
    • 查询handler,调用send时马上执行,是同步的。
    • 执行handler,通过handler注册时的context执行,是异步的。
  • 消息响应的实现方式是注册一个响应handler到EventBus中,名为__vertx.reply.xxx,其中xxx为单调递增数字。
  • 如果同一地址注册了多个handler,则点对点传输模式下只会取第一个handler进行处理;发布模式下才会执行所有。
  • 在一个上下文中注册的handler,不管被执行时机如何,最终都会在该上下文中执行。参见:holder.getContext().runOnContext(...,hodler为HandlerHolder对象,在调用consumer注册时保存了注册上下文。

和EventLoop的关系在哪?

通过consumer和send看到了EventBus是如何协调接收和发送的,但并没有看到EventLoop是如何参与的。其实它是有参与的,在holder.getContext().runOnContext(...是进行了参与。

于是我们看看EventLoopContext.runOnContext(),如下。就是向Context保存的EventLoop对象提交一个任务即可。调度的事,交给Netty来做

// 看到只调用了一个executeAsync()
@Override
public void runOnContext(Handler<Void> task) {
    try {
        executeAsync(task);
    } catch (RejectedExecutionException ignore) {
        // Pool is already shut down
    }
}
// 这里就能看到Vertx的底了,它直接将任务提交给了netty的eventLoop
void executeAsync(Handler<Void> task) {
    nettyEventLoop().execute(() -> executeTask(null, task));
}

Verticle工作机制

Vert.x推荐使用Verticle进行开发,它是一个类Actor的模型,具有如下特点。

  • 同一Verticle下的所有操作均在一个EventLoop线程上执行。以此避免了线程安全问题。
  • Verticle之间通过EventBus进行消息传递
  • Verticle具有父子层级关系

一个典型的代码结构如下(官方starter使用Launcher启动的应用,本质上也是通过这种方式启动的)

class Verticle1 : AbstractVerticle() {
    override fun start() {
        println("Verticle 1 started")
    }
}

class Verticle2 : AbstractVerticle() {
    override fun start() {
        println("Verticle 2 started")
    }
}

fun main() {
    val vertx = Vertx.vertx();
    vertx.deployVerticle(Verticle1::class.java.canonicalName)
    vertx.deployVerticle(Verticle2::class.java.canonicalName)
}

我们需要探究的问题是

  • deployVerticle时发生了什么?
  • start()和stop()方法什么时候被调用?
  • 如何保证一个Verticle下的所有操作都在一个EventLoop线程上执行?
  • 父子层级关系如何维持?有什么作用?

要搞清楚这些问题,我们先看几个与此相关的类

Deployment

维护一个发布状态,父子状态也是由它维护的。其唯一实现类DeploymentImpl是作为DeploymentManager的私有内部类存在的。这意味着Verticle发布的所有操作都在DeploymentManager内完成。

其中可能需要解释的点是getVerticles(),这意味着一个Deployment可以有多个Verticle吗?一定程度上是,但仅当一个Verticle需要发布多个实例时,才会存在多个Verticle对象。

Vertx快速入门参考
其中需要重点关注的方法是io.vertx.core.impl.DeploymentManager.DeploymentImpl#doUndeployio.vertx.core.impl.DeploymentManager.DeploymentImpl#doUndeployChildren,两个方法递归调用,完成了指定Verticle及其子Verticle的取消。

public synchronized Future<Void> doUndeploy(ContextInternal undeployingContext) {
    if (status == ST_UNDEPLOYED) {
        return Future.failedFuture(new IllegalStateException("Already undeployed"));
    }
    // 子发布不为空,则先取消子发布,成功后再取消当前发布。
    if (!children.isEmpty()) {
        status = ST_UNDEPLOYING;
        return doUndeployChildren(undeployingContext).compose(v -> doUndeploy(undeployingContext));
    } else {
        // 子发布为空、或取消子发布完成,现在来取消当前发布
        status = ST_UNDEPLOYED;
        List<Future> undeployFutures = new ArrayList<>();
        if (parent != null) {
            parent.removeChild(this);
        }
        // 为当前发布的每个Verticle实例执行此操作
        for (VerticleHolder verticleHolder: verticles) {
            ContextImpl context = verticleHolder.context;
            Promise p = Promise.promise();
            undeployFutures.add(p.future());
            // 该context是Verticle发布时就存好的,调用它保证了Verticle的stop和start方法在同一个线程运行。
            context.runOnContext(v -> {
                Promise<Void> stopPromise = Promise.promise();
                Future<Void> stopFuture = stopPromise.future();
                stopFuture.setHandler(ar -> {
                    // 从deployments映射中移除
                    deployments.remove(deploymentID);
                    VertxMetrics metrics = vertx.metricsSPI();
                    if (metrics != null) {
                        metrics.verticleUndeployed(verticleHolder.verticle);
                    }
                    context.runCloseHooks(ar2 -> {
                        if (ar2.failed()) {
                            // Log error but we report success anyway
                            log.error("Failed to run close hook", ar2.cause());
                        }
                        if (ar.succeeded()) {
                            p.complete();
                        } else if (ar.failed()) {
                            p.fail(ar.cause());
                        }
                    });
                });
                try {
                    // 执行Verticle的stop方法
                    verticleHolder.verticle.stop(stopPromise);
                } catch (Throwable t) {
                    if (!stopPromise.tryFail(t)) {
                        undeployingContext.reportException(t);
                    }
                }
            });
        }
        Promise<Void> resolvingPromise = undeployingContext.promise();
        CompositeFuture.all(undeployFutures).<Void>mapEmpty().setHandler(resolvingPromise);
        return resolvingPromise.future();
    }
}

private synchronized Future<Void> doUndeployChildren(ContextInternal undeployingContext) {
    if (!children.isEmpty()) {
        List<Future> childFuts = new ArrayList<>();
        // 对每个子发布执行doUndeploy方法
        for (Deployment childDeployment: new HashSet<>(children)) {
            Promise<Void> p = Promise.promise();
            childFuts.add(p.future());
            childDeployment.doUndeploy(undeployingContext, ar -> {
                children.remove(childDeployment);
                p.handle(ar);
            });
        }
        return CompositeFuture.all(childFuts).mapEmpty();
    } else {
        return Future.succeededFuture();
    }
}

总结如下

  • 一个Verticle被取消,则其所有子Verticle都会被取消
  • VerticleHolder中存储了Verticle对应的Context,因此能够保证Verticle的所有生命周期方法都在同一个Context中执行。

DeploymentManager

DeploymentManager专门用于Verticle发布。

Vertx快速入门参考

重点方法在如下几个

  • DeploymentManager#doDeploy(DeploymentOptions, Function<Verticle,String>, ContextInternal, ContextInternal,ClassLoader, Callable<io.vertx.core.Verticle>)
  • DeploymentManager#undeployVerticle(String)

发布

发布代码如下

private Future<Deployment> doDeploy(String identifier,
                                    DeploymentOptions options,
                                    ContextInternal parentContext,
                                    ContextInternal callingContext,
                                    ClassLoader tccl, Verticle... verticles) {
    Promise<Deployment> promise = callingContext.promise();
    String poolName = options.getWorkerPoolName();

    Deployment parent = parentContext.getDeployment();
    // 生成发布ID
    String deploymentID = generateDeploymentID();
    // 创建Deployment对象,上面有说过它是干啥的
    DeploymentImpl deployment = new DeploymentImpl(parent, deploymentID, identifier, options);

    // 发布计数
    AtomicInteger deployCount = new AtomicInteger();
    // 失败标示
    AtomicBoolean failureReported = new AtomicBoolean();
    // 如果一个Verticle发布多个实例,则会有多个verticle对象
    for (Verticle verticle: verticles) {
        // Verticle可以被要求发布到Worker线程池还是EventLoop线程池,在这里做区分
        WorkerExecutorInternal workerExec = poolName != null ? vertx.createSharedWorkerExecutor(poolName, options.getWorkerPoolSize(), options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit()) : null;
        WorkerPool pool = workerExec != null ? workerExec.getPool() : null;
        // 为每个Verticle都创建一个新的Context
        ContextImpl context = (ContextImpl) (options.isWorker() ? vertx.createWorkerContext(deployment, pool, tccl) :
                                             vertx.createEventLoopContext(deployment, pool, tccl));
        if (workerExec != null) {
            context.addCloseHook(workerExec);
        }
        // 向Deployment加入Verticle对象
        deployment.addVerticle(new VerticleHolder(verticle, context));
        // 在新创建的Context上执行Verticle生命周期
        context.runOnContext(v -> {
            try {
                // 执行init方法
                verticle.init(vertx, context);
                Promise<Void> startPromise = context.promise();
                Future<Void> startFuture = startPromise.future();
                // 执行start方法
                verticle.start(startPromise);
                startFuture.setHandler(ar -> {
                    if (ar.succeeded()) {
                        if (parent != null) {
                            // 发布成功,加入父节点
                            if (parent.addChild(deployment)) {
                                deployment.child = true;
                            } else {
                                // Orphan
                                deployment.undeploy(event -> promise.fail("Verticle deployment failed.Could not be added as child of parent verticle"));
                                return;
                            }
                        }
                        // 加入发布完成的map
                        deployments.put(deploymentID, deployment);
                        // 发布的数量和待发布的数量匹配,说明发布完成,成功结束
                        if (deployCount.incrementAndGet() == verticles.length) {
                            promise.complete(deployment);
                        }
                    } else if (failureReported.compareAndSet(false, true)) {
                        // 发布失败的回滚
                        deployment.rollback(callingContext, promise, context, ar.cause());
                    }
                });
            } catch (Throwable t) {
                if (failureReported.compareAndSet(false, true))
                    deployment.rollback(callingContext, promise, context, t);
            }
        });
    }

    return promise.future();
}

总结如下

  • 对每个verticle,vertx都会创建一个新的Context,因此每个verticle之间是相互独立的(一个Context代表了一个EventLoop线程。)
  • 传入init和start方法的vertx实例,是DeploymentManager中维护的,它是在Vertx.vertx()创建时赋予的,整个应用一个。
  • 整个verticle的内容都通过Context.runOnContext注册运行,所以它们才会始终都在一个线程上执行,并且执行顺序从上到下,不存在多线程竞争问题。
  • 发布完成的Deployment会被加入DeploymentManager维护的deployments映射中,方便进行查找和之后的使用。

取消发布

public Future<Void> undeployVerticle(String deploymentID) {
    // 从deployments中获取Deployment对象
    Deployment deployment = deployments.get(deploymentID);
    // 获取当前上下文
    Context currentContext = vertx.getOrCreateContext();
    if (deployment == null) {
        return ((ContextInternal) currentContext).failedFuture(new IllegalStateException("Unknown deployment"));
    } else {
        // 调用deployment的undeploy()
        return deployment.undeploy();
    }
}

Deployment.undeploy()在上面介绍Deployment时已介绍。

VerticleManager

DeploymentManager专注于发布,VerticleManager则主要专注于Verticle的创建。其内部持有一个DeploymentManager对象,用于执行实际的发布操作。

Vertx快速入门参考
该类中有两个主要逻辑

  • VerticleFactory的注册、取消、查找等。可以实现自定义的VerticleFactory,这里不深入。
  • Verticle的发布和创建的逻辑:调用VerticleFactory创建Verticle实例,在调用DeploymentManager.deploy()发布,代码过长,不给出。

所以Verticle是如何工作的?

这里回答最初提出的四个问题,就能解释Verticle是如何工作的。

  • deployVerticle时发生了什么?

    创建Verticle对象 -> 创建Context并和Verticle对象绑定 -> 构建Deployment并存起来 -> 执行init() -> 执行start() -> 完成

  • start()和stop()方法什么时候被调用?

    start(): 发布时,在新创建的Context上执行。

    stop(): 取消发布时,在与该Verticle绑定的Context上执行。

  • 如何保证一个Verticle下的所有操作都在一个EventLoop线程上执行?

    通过将Context和Verticle绑定,调用start()和stop()时均在该Context下执行;而在start()和stop()中调用vertx的大多数操作,均是在调用代码块的当前Context下执行,而一个Context始终对应同一个EventLoop线程,如此即能保证一个Verticle下的所有操作都在同一个EventLoop线程上执行。

  • 父子层级关系如何维持?有什么作用?

    通过Deployment对象记录并维持。作用在于关闭一个Verticle时,其子Verticle也会被依次关闭。

如此一来,Verticle几乎有了除容错机制外的所有的Actor模型的特性。

数据共享机制

Vertx提供了SharedData组件,用于为整个应用范围内提供共享组件,一个共享Map的使用大概如下

class Verticle1 : AbstractVerticle() {
    override fun start() {
        println("Verticle 1 started")
        vertx.sharedData().getLocalAsyncMap<String, String>("myMap").setHandler { ar ->
                                                                                 ar.result().put("你好", "我是Verticle1")
                                                                                }
    }
}

class Verticle2 : AbstractVerticle() {
    override fun start() {
        println("Verticle 2 started")
        vertx.sharedData().getLocalAsyncMap<String, String>("myMap").setHandler { ar ->
                                                                                 val value = ar.result().get("你好").result()
                                                                                 println(value)
                                                                                }
    }
}

fun main() {
    val vertx = Vertx.vertx();
    vertx.deployVerticle(Verticle1::class.java.canonicalName)
    Thread.sleep(1000)
    vertx.deployVerticle(Verticle2::class.java.canonicalName)
}

所有关于共享数据的内容都在io.vertx.core.shareddata包下,核心类是SharedDataImpl。

提供如下三种数据结构

  • io.vertx.core.shareddata.impl.LocalAsyncLocks

    异步排他锁,在集群内部有效的锁。其实现的思路如下

    • 维护一个ConcurrentMap,存储锁名和等待该锁的Handler列表
    • 每次新来一个获取锁的请求,向等待列表中加入。并启动定时器开始计算超时,超时后直接回调锁等待超时。

    至此加入等待列表的逻辑完成。然后是锁流转逻辑。采用被动的逻辑,非常节省复杂度。

    • 当等待列表为空时,来一个请求就将锁给它;列表不为空时,仅加入等待列表,不做尝试获取锁的操作。
    • 当一个锁被释放时,再主动将锁给等待列表的下一个请求。这样几乎从来不会出现竞争的情况。
  • io.vertx.core.shareddata.impl.AsynchronousCounter

    计数器,增减都是原子操作

  • io.vertx.core.shareddata.impl.LocalMapImpl

    本地Map,用于单个实例*享数据。仅是对ConcurrentMap的包装,没有其它特别之处。他的所有操作都是同步的。

  • io.vertx.core.shareddata.impl.LocalAsyncMapImpl

    异步Map,同样是对ConcurrentMap的包装。不同之处在于其value是Holder类,它封装了TTL,实现原理是调用vertx.setTimer设置一个TTL长度的定时器,过期移除。

    @Override
    public void put(K k, V v, long timeout, Handler<AsyncResult<Void>> completionHandler) {
        long timestamp = System.nanoTime();
        long timerId = vertx.setTimer(timeout, l -> removeIfExpired(k));
        Holder<V> previous = map.put(k, new Holder<>(v, timerId, timeout, timestamp));
        if (previous != null && previous.expires()) {
            vertx.cancelTimer(previous.timerId);
        }
        completionHandler.handle(Future.succeededFuture());
    }
    

    可能有顾虑设置太多定时器不好,但vertx其实是将定时任务加入eventLoop线程去执行,因此并不会增加额外成本

    public long setTimer(long delay, Handler<Long> handler) {
        return scheduleTimeout(getOrCreateContext(), handler, delay, false);
    }
    private long scheduleTimeout(ContextImpl context, Handler<Long> handler, long delay, boolean periodic) {
        if (delay < 1) {
            throw new IllegalArgumentException("Cannot schedule a timer with delay < 1 ms");
        }
        long timerId = timeoutCounter.getAndIncrement();
        InternalTimerHandler task = new InternalTimerHandler(timerId, handler, periodic, delay, context);
        timeouts.put(timerId, task);
        context.addCloseHook(task);
        return timerId;
    }
    InternalTimerHandler(long timerID, Handler<Long> runnable, boolean periodic, long delay, ContextImpl context) {
        this.context = context;
        this.timerID = timerID;
        this.handler = runnable;
        this.periodic = periodic;
        EventLoop el = context.nettyEventLoop();
        if (periodic) {
            future = el.scheduleAtFixedRate(this, delay, delay, TimeUnit.MILLISECONDS);
        } else {
            future = el.schedule(this, delay, TimeUnit.MILLISECONDS);
        }
        if (metrics != null) {
            metrics.timerCreated(timerID);
        }
    }
    

框图

有待为每个工作原理都加上框图

总结

Vertx核心为EventBus、EventLoop,以及Verticle。这里通过先展示核心类的能力和实现原理,让读者有一个具象的认识,了解每个核心类大概有能干什么。然后通过EventBus的简单收发分析,展示了EventBus的工作原理及EventLoop参与代码执行的方式;通过Verticle的发布,展示了Verticle是如何运转的,以及Verticle的线程安全特性得到保障的原因;最后展示了SharedData进行应用范围内数据共享的实现原理。让读者对Vert.x核心部分有了较为深入的认识。

当然,Vert.x的能力远不止于此,这里仅介绍了单机版运行原理,它还支持集群和高可用特性,都是本文没有覆盖到的;此外,核心部分的文件系统、网络编程相关内容也均未介绍,这些留待之后再说。

最后,总结一波一些核心组件相互之间的关系。

  • 一般来说,一个应用只有一个Vertx,在整个应用中传来传去的vertx实例,都是一个,除非我们想要拥有完全隔离的EventBus。

  • 一个Vertx实例只持有一个EventBus和一个用于日常调度的EventLoopGroup(用于网络服务监听的不算)。

  • 一个Vertx实例持有多个线程池,我们最常解除的只有EventLoopGroup和WorkerPool。

  • 一个Context只持有一个EventLoop,即只对应一个线程。通过runOnContext()将任务调度到该EventLoop上执行。

  • 一个VerticleManager持有多个VerticleFactory。

  • 一个DeployManager持有多个Deployment,Deployment之间的父子关系由Deployment自己维护。

  • 一个Deployment可以持有多个Verticle实例,但仅能持有一个Verticle类型

    }
    if (metrics != null) {
        metrics.timerCreated(timerID);
    }
    
    
    

框图

有待为每个工作原理都加上框图

总结

Vertx核心为EventBus、EventLoop,以及Verticle。这里通过先展示核心类的能力和实现原理,让读者有一个具象的认识,了解每个核心类大概有能干什么。然后通过EventBus的简单收发分析,展示了EventBus的工作原理及EventLoop参与代码执行的方式;通过Verticle的发布,展示了Verticle是如何运转的,以及Verticle的线程安全特性得到保障的原因;最后展示了SharedData进行应用范围内数据共享的实现原理。让读者对Vert.x核心部分有了较为深入的认识。

当然,Vert.x的能力远不止于此,这里仅介绍了单机版运行原理,它还支持集群和高可用特性,都是本文没有覆盖到的;此外,核心部分的文件系统、网络编程相关内容也均未介绍,这些留待之后再说。

最后,总结一波一些核心组件相互之间的关系。

  • 一般来说,一个应用只有一个Vertx,在整个应用中传来传去的vertx实例,都是一个,除非我们想要拥有完全隔离的EventBus。
  • 一个Vertx实例只持有一个EventBus和一个用于日常调度的EventLoopGroup(用于网络服务监听的不算)。
  • 一个Vertx实例持有多个线程池,我们最常解除的只有EventLoopGroup和WorkerPool。
  • 一个Context只持有一个EventLoop,即只对应一个线程。通过runOnContext()将任务调度到该EventLoop上执行。
  • 一个VerticleManager持有多个VerticleFactory。
  • 一个DeployManager持有多个Deployment,Deployment之间的父子关系由Deployment自己维护。
  • 一个Deployment可以持有多个Verticle实例,但仅能持有一个Verticle类型

本文地址:https://blog.csdn.net/qq_18870127/article/details/110943712