欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  后端开发

Nginx 中 upstream 机制的实现

程序员文章站 2022-05-15 11:50:07
...
概述

upstream 机制使得 Nginx 成为一个反向代理服务器,Nginx 接收来自下游客户端的 http 请求,并处理该请求,同时根据该请求向上游服务器发送 tcp 请求报文,上游服务器会根据该请求返回相应地响应报文,Nginx 根据上游服务器的响应报文,决定是否向下游客户端转发响应报文。另外 upstream 机制提供了负载均衡的功能,可以将请求负载均衡到集群服务器的某个服务器上面。

启动 upstream

在 Nginx 中调用 ngx_http_upstream_init 方法启动 upstream 机制,但是在使用 upstream 机制之前必须调用 ngx_http_upstream_create 方法创建 ngx_http_upstream_t 结构体,因为默认情况下 ngx_http_request_t 结构体中的 upstream 成员是指向 NULL,该结构体的具体初始化工作还需由 HTTP 模块完成。有关 ngx_http_upstream_t 结构体 和ngx_http_upstream_conf_t 结构体的相关说明可参考文章《Nginx 中 upstream 机制》。

下面是函数 ngx_http_upstream_create 的实现:

/* 创建 ngx_http_upstream_t 结构体 */
ngx_int_t
ngx_http_upstream_create(ngx_http_request_t *r)
{
    ngx_http_upstream_t  *u;

    u = r->upstream;

    /*
     * 若已经创建过ngx_http_upstream_t 且定义了cleanup成员,
     * 则调用cleanup清理方法将原始结构体清除;
     */
    if (u && u->cleanup) {
        r->main->count++;
        ngx_http_upstream_cleanup(r);
    }

    /* 从内存池分配ngx_http_upstream_t 结构体空间 */
    u = ngx_pcalloc(r->pool, sizeof(ngx_http_upstream_t));
    if (u == NULL) {
        return NGX_ERROR;
    }

    /* 给ngx_http_request_t 结构体成员upstream赋值 */
    r->upstream = u;

    u->peer.log = r->connection->log;
    u->peer.log_error = NGX_ERROR_ERR;
#if (NGX_THREADS)
    u->peer.lock = &r->connection->lock;
#endif

#if (NGX_HTTP_CACHE)
    r->cache = NULL;
#endif

    u->headers_in.content_length_n = -1;

    return NGX_OK;
}

关于 upstream 机制的启动方法 ngx_http_upstream_init 的执行流程如下:

  • 检查 Nginx 与下游服务器之间连接上的读事件是否在定时器中,即检查 timer_set 标志位是否为 1,若该标志位为 1,则把读事件从定时器中移除;
  • 调用 ngx_http_upstream_init_request 方法启动 upstream 机制;

ngx_http_upstream_init_request 方法执行流程如下所示:

  • 检查 ngx_http_upstream_t 结构体中的 store 标志位是否为 0;检查 ngx_http_request_t 结构体中的 post_action 标志位是否为0;检查 ngx_http_upstream_conf_t 结构体中的ignore_client_abort 是否为 0;若上面的标志位都为 0,则设置ngx_http_request_t 请求的读事件的回调方法为ngx_http_upstream_rd_check_broken_connection;设置写事件的回调方法为 ngx_http_upstream_wr_check_broken_connection;这两个方法都会调用 ngx_http_upstream_check_broken_connection方法检查 Nginx 与下游之间的连接是否正常,若出现错误,则终止连接;
  • 若不满足上面的标志位,即至少有一个不为 0 ,调用请求中ngx_http_upstream_t 结构体中某个 HTTP 模块实现的create_request 方法,构造发往上游服务器的请求;
  • 调用 ngx_http_cleanup_add 方法向原始请求的 cleanup 链表尾端添加一个回调 handler 方法,该回调方法设置为ngx_http_upstream_cleanup,若当前请求结束时会调用该方法做一些清理工作;
  • 调用 ngx_http_upstream_connect 方法向上游服务器发起连接请求;
/* 初始化启动upstream机制 */
void
ngx_http_upstream_init(ngx_http_request_t *r)
{
    ngx_connection_t     *c;

    /* 获取当前请求所对应的连接 */
    c = r->connection;

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http init upstream, client timer: %d", c->read->timer_set);

#if (NGX_HTTP_SPDY)
    if (r->spdy_stream) {
        ngx_http_upstream_init_request(r);
        return;
    }
#endif

    /*
     * 检查当前连接上读事件的timer_set标志位是否为1,若该标志位为1,
     * 表示读事件在定时器机制中,则需要把它从定时器机制中移除;
     * 因为在启动upstream机制后,就不需要对客户端的读操作进行超时管理;
     */
    if (c->read->timer_set) {
        ngx_del_timer(c->read);
    }

    if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {

        if (!c->write->active) {
            if (ngx_add_event(c->write, NGX_WRITE_EVENT, NGX_CLEAR_EVENT)
                == NGX_ERROR)
            {
                ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }
        }
    }

    ngx_http_upstream_init_request(r);
}
static void
ngx_http_upstream_init_request(ngx_http_request_t *r)
{
    ngx_str_t                      *host;
    ngx_uint_t                      i;
    ngx_resolver_ctx_t             *ctx, temp;
    ngx_http_cleanup_t             *cln;
    ngx_http_upstream_t            *u;
    ngx_http_core_loc_conf_t       *clcf;
    ngx_http_upstream_srv_conf_t   *uscf, **uscfp;
    ngx_http_upstream_main_conf_t  *umcf;

    if (r->aio) {
        return;
    }

    u = r->upstream;

#if (NGX_HTTP_CACHE)
    ...
    ...
#endif

    /* 文件缓存标志位 */
    u->store = (u->conf->store || u->conf->store_lengths);

    /*
     * 检查ngx_http_upstream_t 结构中标志位 store;
     * 检查ngx_http_request_t 结构中标志位 post_action;
     * 检查ngx_http_upstream_conf_t 结构中标志位 ignore_client_abort;
     * 若上面这些标志位为1,则表示需要检查Nginx与下游(即客户端)之间的TCP连接是否断开;
     */
    if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {
        r->read_event_handler = ngx_http_upstream_rd_check_broken_connection;
        r->write_event_handler = ngx_http_upstream_wr_check_broken_connection;
    }

    /* 把当前请求包体结构保存在ngx_http_upstream_t 结构的request_bufs链表缓冲区中 */
    if (r->request_body) {
        u->request_bufs = r->request_body->bufs;
    }

    /* 调用create_request方法构造发往上游服务器的请求 */
    if (u->create_request(r) != NGX_OK) {
        ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    /* 获取ngx_http_upstream_t结构中主动连接结构peer的local本地地址信息 */
    u->peer.local = ngx_http_upstream_get_local(r, u->conf->local);

    /* 获取ngx_http_core_module模块的loc级别的配置项结构 */
    clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);

    /* 初始化ngx_http_upstream_t结构中成员output向下游发送响应的方式 */
    u->output.alignment = clcf->directio_alignment;
    u->output.pool = r->pool;
    u->output.bufs.num = 1;
    u->output.bufs.size = clcf->client_body_buffer_size;
    u->output.output_filter = ngx_chain_writer;
    u->output.filter_ctx = &u->writer;

    u->writer.pool = r->pool;

    /* 添加用于表示上游响应的状态,例如:错误编码、包体长度等 */
    if (r->upstream_states == NULL) {

        r->upstream_states = ngx_array_create(r->pool, 1,
                                            sizeof(ngx_http_upstream_state_t));
        if (r->upstream_states == NULL) {
            ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

    } else {

        u->state = ngx_array_push(r->upstream_states);
        if (u->state == NULL) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t));
    }

    /*
     * 调用ngx_http_cleanup_add方法原始请求的cleanup链表尾端添加一个回调handler方法,
     * 该handler回调方法设置为ngx_http_upstream_cleanup,若当前请求结束时会调用该方法做一些清理工作;
     */
    cln = ngx_http_cleanup_add(r, 0);
    if (cln == NULL) {
        ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    cln->handler = ngx_http_upstream_cleanup;
    cln->data = r;
    u->cleanup = &cln->handler;

    if (u->resolved == NULL) {

        /* 若没有实现u->resolved标志位,则定义上游服务器的配置 */
        uscf = u->conf->upstream;

    } else {

        /*
         * 若实现了u->resolved标志位,则解析主机域名,指定上游服务器的地址;
         */


        /*
         * 若已经指定了上游服务器地址,则不需要解析,
         * 直接调用ngx_http_upstream_connection方法向上游服务器发起连接;
         * 并return从当前函数返回;
         */
        if (u->resolved->sockaddr) {

            if (ngx_http_upstream_create_round_robin_peer(r, u->resolved)
                != NGX_OK)
            {
                ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }

            ngx_http_upstream_connect(r, u);

            return;
        }

        /*
         * 若还没指定上游服务器的地址,则需解析主机域名;
         * 若成功解析出上游服务器的地址和端口号,
         * 则调用ngx_http_upstream_connection方法向上游服务器发起连接;
         */
        host = &u->resolved->host;

        umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module);

        uscfp = umcf->upstreams.elts;

        for (i = 0; i upstreams.nelts; i++) {

            uscf = uscfp[i];

            if (uscf->host.len == host->len
                && ((uscf->port == 0 && u->resolved->no_port)
                     || uscf->port == u->resolved->port)
                && ngx_strncasecmp(uscf->host.data, host->data, host->len) == 0)
            {
                goto found;
            }
        }

        if (u->resolved->port == 0) {
            ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                          "no port in upstream \"%V\"", host);
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        temp.name = *host;

        ctx = ngx_resolve_start(clcf->resolver, &temp);
        if (ctx == NULL) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        if (ctx == NGX_NO_RESOLVER) {
            ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                          "no resolver defined to resolve %V", host);

            ngx_http_upstream_finalize_request(r, u, NGX_HTTP_BAD_GATEWAY);
            return;
        }

        ctx->name = *host;
        ctx->handler = ngx_http_upstream_resolve_handler;
        ctx->data = r;
        ctx->timeout = clcf->resolver_timeout;

        u->resolved->ctx = ctx;

        if (ngx_resolve_name(ctx) != NGX_OK) {
            u->resolved->ctx = NULL;
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        return;
    }

found:

    if (uscf == NULL) {
        ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
                      "no upstream configuration");
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    if (uscf->peer.init(r, uscf) != NGX_OK) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    ngx_http_upstream_connect(r, u);
}

static void
ngx_http_upstream_rd_check_broken_connection(ngx_http_request_t *r)
{
    ngx_http_upstream_check_broken_connection(r, r->connection->read);
}


static void
ngx_http_upstream_wr_check_broken_connection(ngx_http_request_t *r)
{
    ngx_http_upstream_check_broken_connection(r, r->connection->write);
}

建立连接

upstream 机制与上游服务器建立 TCP 连接时,采用的是非阻塞模式的套接字,即发起连接请求之后立即返回,不管连接是否建立成功,若没有立即建立成功,则需在 epoll 事件机制中监听该套接字。向上游服务器发起连接请求由函数ngx_http_upstream_connect 实现。在分析 ngx_http_upstream_connect 方法之前,首先分析下 ngx_event_connect_peer 方法,因为该方法会被ngx_http_upstream_connect 方法