Nginx 中 upstream 机制的实现
upstream 机制使得 Nginx 成为一个反向代理服务器,Nginx 接收来自下游客户端的 http 请求,并处理该请求,同时根据该请求向上游服务器发送 tcp 请求报文,上游服务器会根据该请求返回相应地响应报文,Nginx 根据上游服务器的响应报文,决定是否向下游客户端转发响应报文。另外 upstream 机制提供了负载均衡的功能,可以将请求负载均衡到集群服务器的某个服务器上面。
启动 upstream
在 Nginx 中调用 ngx_http_upstream_init 方法启动 upstream 机制,但是在使用 upstream 机制之前必须调用 ngx_http_upstream_create 方法创建 ngx_http_upstream_t 结构体,因为默认情况下 ngx_http_request_t 结构体中的 upstream 成员是指向 NULL,该结构体的具体初始化工作还需由 HTTP 模块完成。有关 ngx_http_upstream_t 结构体 和ngx_http_upstream_conf_t 结构体的相关说明可参考文章《Nginx 中 upstream 机制》。
下面是函数 ngx_http_upstream_create 的实现:
/* 创建 ngx_http_upstream_t 结构体 */ ngx_int_t ngx_http_upstream_create(ngx_http_request_t *r) { ngx_http_upstream_t *u; u = r->upstream; /* * 若已经创建过ngx_http_upstream_t 且定义了cleanup成员, * 则调用cleanup清理方法将原始结构体清除; */ if (u && u->cleanup) { r->main->count++; ngx_http_upstream_cleanup(r); } /* 从内存池分配ngx_http_upstream_t 结构体空间 */ u = ngx_pcalloc(r->pool, sizeof(ngx_http_upstream_t)); if (u == NULL) { return NGX_ERROR; } /* 给ngx_http_request_t 结构体成员upstream赋值 */ r->upstream = u; u->peer.log = r->connection->log; u->peer.log_error = NGX_ERROR_ERR; #if (NGX_THREADS) u->peer.lock = &r->connection->lock; #endif #if (NGX_HTTP_CACHE) r->cache = NULL; #endif u->headers_in.content_length_n = -1; return NGX_OK; }
关于 upstream 机制的启动方法 ngx_http_upstream_init 的执行流程如下:
- 检查 Nginx 与下游服务器之间连接上的读事件是否在定时器中,即检查 timer_set 标志位是否为 1,若该标志位为 1,则把读事件从定时器中移除;
- 调用 ngx_http_upstream_init_request 方法启动 upstream 机制;
ngx_http_upstream_init_request 方法执行流程如下所示:
- 检查 ngx_http_upstream_t 结构体中的 store 标志位是否为 0;检查 ngx_http_request_t 结构体中的 post_action 标志位是否为0;检查 ngx_http_upstream_conf_t 结构体中的ignore_client_abort 是否为 0;若上面的标志位都为 0,则设置ngx_http_request_t 请求的读事件的回调方法为ngx_http_upstream_rd_check_broken_connection;设置写事件的回调方法为 ngx_http_upstream_wr_check_broken_connection;这两个方法都会调用 ngx_http_upstream_check_broken_connection方法检查 Nginx 与下游之间的连接是否正常,若出现错误,则终止连接;
- 若不满足上面的标志位,即至少有一个不为 0 ,调用请求中ngx_http_upstream_t 结构体中某个 HTTP 模块实现的create_request 方法,构造发往上游服务器的请求;
- 调用 ngx_http_cleanup_add 方法向原始请求的 cleanup 链表尾端添加一个回调 handler 方法,该回调方法设置为ngx_http_upstream_cleanup,若当前请求结束时会调用该方法做一些清理工作;
- 调用 ngx_http_upstream_connect 方法向上游服务器发起连接请求;
/* 初始化启动upstream机制 */ void ngx_http_upstream_init(ngx_http_request_t *r) { ngx_connection_t *c; /* 获取当前请求所对应的连接 */ c = r->connection; ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0, "http init upstream, client timer: %d", c->read->timer_set); #if (NGX_HTTP_SPDY) if (r->spdy_stream) { ngx_http_upstream_init_request(r); return; } #endif /* * 检查当前连接上读事件的timer_set标志位是否为1,若该标志位为1, * 表示读事件在定时器机制中,则需要把它从定时器机制中移除; * 因为在启动upstream机制后,就不需要对客户端的读操作进行超时管理; */ if (c->read->timer_set) { ngx_del_timer(c->read); } if (ngx_event_flags & NGX_USE_CLEAR_EVENT) { if (!c->write->active) { if (ngx_add_event(c->write, NGX_WRITE_EVENT, NGX_CLEAR_EVENT) == NGX_ERROR) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } } } ngx_http_upstream_init_request(r); }
static void ngx_http_upstream_init_request(ngx_http_request_t *r) { ngx_str_t *host; ngx_uint_t i; ngx_resolver_ctx_t *ctx, temp; ngx_http_cleanup_t *cln; ngx_http_upstream_t *u; ngx_http_core_loc_conf_t *clcf; ngx_http_upstream_srv_conf_t *uscf, **uscfp; ngx_http_upstream_main_conf_t *umcf; if (r->aio) { return; } u = r->upstream; #if (NGX_HTTP_CACHE) ... ... #endif /* 文件缓存标志位 */ u->store = (u->conf->store || u->conf->store_lengths); /* * 检查ngx_http_upstream_t 结构中标志位 store; * 检查ngx_http_request_t 结构中标志位 post_action; * 检查ngx_http_upstream_conf_t 结构中标志位 ignore_client_abort; * 若上面这些标志位为1,则表示需要检查Nginx与下游(即客户端)之间的TCP连接是否断开; */ if (!u->store && !r->post_action && !u->conf->ignore_client_abort) { r->read_event_handler = ngx_http_upstream_rd_check_broken_connection; r->write_event_handler = ngx_http_upstream_wr_check_broken_connection; } /* 把当前请求包体结构保存在ngx_http_upstream_t 结构的request_bufs链表缓冲区中 */ if (r->request_body) { u->request_bufs = r->request_body->bufs; } /* 调用create_request方法构造发往上游服务器的请求 */ if (u->create_request(r) != NGX_OK) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } /* 获取ngx_http_upstream_t结构中主动连接结构peer的local本地地址信息 */ u->peer.local = ngx_http_upstream_get_local(r, u->conf->local); /* 获取ngx_http_core_module模块的loc级别的配置项结构 */ clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); /* 初始化ngx_http_upstream_t结构中成员output向下游发送响应的方式 */ u->output.alignment = clcf->directio_alignment; u->output.pool = r->pool; u->output.bufs.num = 1; u->output.bufs.size = clcf->client_body_buffer_size; u->output.output_filter = ngx_chain_writer; u->output.filter_ctx = &u->writer; u->writer.pool = r->pool; /* 添加用于表示上游响应的状态,例如:错误编码、包体长度等 */ if (r->upstream_states == NULL) { r->upstream_states = ngx_array_create(r->pool, 1, sizeof(ngx_http_upstream_state_t)); if (r->upstream_states == NULL) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } } else { u->state = ngx_array_push(r->upstream_states); if (u->state == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t)); } /* * 调用ngx_http_cleanup_add方法原始请求的cleanup链表尾端添加一个回调handler方法, * 该handler回调方法设置为ngx_http_upstream_cleanup,若当前请求结束时会调用该方法做一些清理工作; */ cln = ngx_http_cleanup_add(r, 0); if (cln == NULL) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } cln->handler = ngx_http_upstream_cleanup; cln->data = r; u->cleanup = &cln->handler; if (u->resolved == NULL) { /* 若没有实现u->resolved标志位,则定义上游服务器的配置 */ uscf = u->conf->upstream; } else { /* * 若实现了u->resolved标志位,则解析主机域名,指定上游服务器的地址; */ /* * 若已经指定了上游服务器地址,则不需要解析, * 直接调用ngx_http_upstream_connection方法向上游服务器发起连接; * 并return从当前函数返回; */ if (u->resolved->sockaddr) { if (ngx_http_upstream_create_round_robin_peer(r, u->resolved) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } ngx_http_upstream_connect(r, u); return; } /* * 若还没指定上游服务器的地址,则需解析主机域名; * 若成功解析出上游服务器的地址和端口号, * 则调用ngx_http_upstream_connection方法向上游服务器发起连接; */ host = &u->resolved->host; umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module); uscfp = umcf->upstreams.elts; for (i = 0; i upstreams.nelts; i++) { uscf = uscfp[i]; if (uscf->host.len == host->len && ((uscf->port == 0 && u->resolved->no_port) || uscf->port == u->resolved->port) && ngx_strncasecmp(uscf->host.data, host->data, host->len) == 0) { goto found; } } if (u->resolved->port == 0) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no port in upstream \"%V\"", host); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } temp.name = *host; ctx = ngx_resolve_start(clcf->resolver, &temp); if (ctx == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } if (ctx == NGX_NO_RESOLVER) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no resolver defined to resolve %V", host); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_BAD_GATEWAY); return; } ctx->name = *host; ctx->handler = ngx_http_upstream_resolve_handler; ctx->data = r; ctx->timeout = clcf->resolver_timeout; u->resolved->ctx = ctx; if (ngx_resolve_name(ctx) != NGX_OK) { u->resolved->ctx = NULL; ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } return; } found: if (uscf == NULL) { ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0, "no upstream configuration"); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } if (uscf->peer.init(r, uscf) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } ngx_http_upstream_connect(r, u); } static void ngx_http_upstream_rd_check_broken_connection(ngx_http_request_t *r) { ngx_http_upstream_check_broken_connection(r, r->connection->read); } static void ngx_http_upstream_wr_check_broken_connection(ngx_http_request_t *r) { ngx_http_upstream_check_broken_connection(r, r->connection->write); }
建立连接
upstream 机制与上游服务器建立 TCP 连接时,采用的是非阻塞模式的套接字,即发起连接请求之后立即返回,不管连接是否建立成功,若没有立即建立成功,则需在 epoll 事件机制中监听该套接字。向上游服务器发起连接请求由函数
上一篇: vim(gvim)配合ZendCodeAnalyzer检查php语法
下一篇: 水仙花开