欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

nginx的filter的处理 博客分类: 服务器设计 nginxCC++C#Blog 

程序员文章站 2024-03-22 14:08:40
...
随笔拿一个nginx的filter模块来看,gzip模块,来看它的初始化。

static ngx_http_output_header_filter_pt  ngx_http_next_header_filter;
static ngx_http_output_body_filter_pt    ngx_http_next_body_filter;

static ngx_int_t
ngx_http_gzip_filter_init(ngx_conf_t *cf)
{
    ngx_http_next_header_filter = ngx_http_top_header_filter;
    ngx_http_top_header_filter = ngx_http_gzip_header_filter;

    ngx_http_next_body_filter = ngx_http_top_body_filter;
    ngx_http_top_body_filter = ngx_http_gzip_body_filter;

    return NGX_OK;
}



这里nginx处理filter将所有的过滤器做成一个类似链表的东东,每次声明一个ngx_http_next_header_filter以及ngx_http_next_body_filter来保存当前的最前面的filter,然后再将自己的filter处理函数赋值给ngx_http_top_header_filter以及ngx_http_top_body_filter ,这样也就是说最后面初始化的filter反而是最早处理。

而在模块本身的filter处理函数中会调用ngx_http_next_header_filter,也就是当前filter插入前的那个最top上的filter处理函数。

然后我们来看nginx如何启动filter的调用。

先来看head_filter的调用:

ngx_int_t
ngx_http_send_header(ngx_http_request_t *r)
{
    if (r->err_status) {
        r->headers_out.status = r->err_status;
        r->headers_out.status_line.len = 0;
    }

    return ngx_http_top_header_filter(r);
}


可以看到当发送header的时候就是调用ngx_http_top_header_filter,nginx这里把status这些也作为一个filter模块来处理的。当启动ngx_http_top_header_filter之后所有的filter处理函数就会象链表一样被一个个的调用。


然后是body filter的调用,这个和header的类似,因此就不解释了。
ngx_int_t
ngx_http_output_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
    ngx_int_t          rc;
    ngx_connection_t  *c;

    c = r->connection;

    ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http output filter \"%V?%V\"", &r->uri, &r->args);

//启动body filter。
    rc = ngx_http_top_body_filter(r, in);
..............................................................

    return rc;
}


这里还有一个问题,那就是最后一个ngx_http_top_header_filter和ngx_http_top_body_filter是什么呢?也就是第一个被插入的filter。

先来看filter被初始化的地方。这里filter的初始化是在ngx_http_block函数中:

for (m = 0; ngx_modules[m]; m++) {
        if (ngx_modules[m]->type != NGX_HTTP_MODULE) {
            continue;
        }

        module = ngx_modules[m]->ctx;

//如果存在postconfiguratio则调用初始化。
        if (module->postconfiguration) {
            if (module->postconfiguration(cf) != NGX_OK) {
                return NGX_CONF_ERROR;
            }
        }
    }


代码很简单就是遍历ngx_modules然后调用初始化函数,而我们这里要找第一个filter,也就是ngx_modules中的第一个bodyfilter和header filter。

来看objs/ngx_modules.c中的ngx_module的定义:

ngx_module_t *ngx_modules[] = {
 ..............................................................
    &ngx_http_write_filter_module,
    &ngx_http_header_filter_module,
........................................................................
    NULL
};


可以看到ngx_http_write_filter_module和ngx_http_header_filter_module分别是body filter和header filter的第一个初始化模块,也就是filter链中的最后一个模块。

接下来我们就来详细分析这两个模块,首先是ngx_http_write_filter_module模块。

这个模块的功能起始很简单,就是遍历chain,然后输出所有的数据,如果有设置flush的话刷新chain。

这里要注意ngx_http_request_t中有一个out的chain,这个chain保存的是上一次还没有被发完的buf,这样每次我们接收到新的chain的话,就需要将新的chain连接到老的out chain上,然后再发出去。

来看代码。

ngx_int_t
ngx_http_write_filter(ngx_http_request_t *r, ngx_chain_t *in)


第一个是request请求,第二个参数是输入的chain。

先来看初始化部分:


  
 off_t                      size, sent, nsent, limit;
    ngx_uint_t                 last, flush;
    ngx_msec_t                 delay;
    ngx_chain_t               *cl, *ln, **ll, *chain;
    ngx_connection_t          *c;
    ngx_http_core_loc_conf_t  *clcf;

//得到当前所属的连接
    c = r->connection;

    if (c->error) {
        return NGX_ERROR;
    }

    size = 0;
    flush = 0;
    last = 0;
//得到上次没有发送完毕的chain
    ll = &r->out;


然后接下来这部分是校验并统计out chain,也就是上次没有完成的chain buf。
 
 
 for (cl = r->out; cl; cl = cl->next) {
        ll = &cl->next;

#if 1
//如果有0长度的buf则返回错误。
        if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
 ......................................................................

            ngx_debug_point();
            return NGX_ERROR;
        }
#endif

//得到buf的大小
        size += ngx_buf_size(cl->buf);
//看当传输完毕后是否要刷新buf。
        if (cl->buf->flush || cl->buf->recycled) {
            flush = 1;
        }
//看是否是最后一个buf
        if (cl->buf->last_buf) {
            last = 1;
        }
    }


接下来这部分是用来链接新的chain到上面的out chain后面:

for (ln = in; ln; ln = ln->next) {
//
        cl = ngx_alloc_chain_link(r->pool);
        if (cl == NULL) {
            return NGX_ERROR;
        }

        cl->buf = ln->buf;
//前面的代码我们知道ll已经指向out chain的最后一个位置了,因此这里就是将新的chain链接到out chain的后面。
        *ll = cl;
        ll = &cl->next;
#if 1
//校验buf
        if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {

            ngx_debug_point();
            return NGX_ERROR;
        }
#endif

//计算大小
        size += ngx_buf_size(cl->buf);

//判断是否需要flush
        if (cl->buf->flush || cl->buf->recycled) {
            flush = 1;
        }

//判断是否是最后一个buf
        if (cl->buf->last_buf) {
            last = 1;
        }
    }


然后接下来的这段代码主要是对进行发送前buf的一些标记的处理。

在看代码之前先来解释下几个比较重要的标记。

第一个是ngx_http_core_module的conf的一个标记postpone_output(conf里面可以配置的),这个表示延迟输出的阀,也就是说将要发送的字节数如果小于这个的话,并且还有另外几个条件的话(下面会解释),就会直接返回不发送当前的chain。

第二个是c->write->delayed,这个表示当前的连接的写必须要被delay了,也就是说现在不能发送了(原因下面会解释),得等另外的地方取消了delayed才能发送,此时我们修改连接的buffered的标记,然后返回NGX_AGAIN.

第三个是c->buffered,因为有时buf并没有发完,因此我们有时就会设置buffed标记,而我们可能会在多个filter模块中被buffered,因此下面就是buffered的类型。

//这个并没有用到
#define NGX_HTTP_LOWLEVEL_BUFFERED         0xf0
//主要是这个,这个表示在最终的write filter中被buffered
#define NGX_HTTP_WRITE_BUFFERED            0x10
//判断是否有被设置
#define NGX_LOWLEVEL_BUFFERED  0x0f

//下面几个filter中被buffered
#define NGX_HTTP_GZIP_BUFFERED             0x20
#define NGX_HTTP_SSI_BUFFERED              0x01
#define NGX_HTTP_SUB_BUFFERED              0x02
#define NGX_HTTP_COPY_BUFFERED             0x04



然后我们来看第二个的意思,这个表示当前的chain已经被buffered了,


第四个是r->limit_rate,这个表示当前的request的发送限制速率,这个也是在nginx.conf中配置的,而一般就是通过这个值来设置c->write->delayed的。也就是说如果发送速率大于这个limit了的话,就设置delayed,然后这边的request就会延迟发送,下面我们的代码会看到nginx如何处理。

  

 clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);

//也就是说将要发送的字节数小于postpone_output并且不是最后一个buf,并且不需要刷新chain的话,就直接返回。
    if (!last && !flush && in && size < (off_t) clcf->postpone_output) {
        return NGX_OK;
    }

///如果设置了write的delayed,则设置标记。
    if (c->write->delayed) {
        c->buffered |= NGX_HTTP_WRITE_BUFFERED;
        return NGX_AGAIN;
    }

//如果size为0,并且没有设置buffered标记,则进入清理工作。
    if (size == 0 && !(c->buffered & NGX_LOWLEVEL_BUFFERED)) {
//如果是最后一个buf,则清理buffered标记然后清理out chain
        if (last) {
            r->out = NULL;
            c->buffered &= ~NGX_HTTP_WRITE_BUFFERED;

            return NGX_OK;
        }

//如果有设置flush的话,则会强行传输当前buf之前的所有buf,因此这里就需要清理out chain。
        if (flush) {
            do {
                r->out = r->out->next;
            } while (r->out);

//清理buf 标记
            c->buffered &= ~NGX_HTTP_WRITE_BUFFERED;

            return NGX_OK;
        }

        ngx_log_error(NGX_LOG_ALERT, c->log, 0,
                      "the http output chain is empty");

        ngx_debug_point();

        return NGX_ERROR;
    }

//如果有发送速率限制。
    if (r->limit_rate) {
//计算是否有超过速率限制
        limit = r->limit_rate * (ngx_time() - r->start_sec + 1)
                - (c->sent - clcf->limit_rate_after);
//如果有
        if (limit <= 0) {
//设置delayed标记
            c->write->delayed = 1;
//设置定时器
            ngx_add_timer(c->write,
                          (ngx_msec_t) (- limit * 1000 / r->limit_rate + 1));

//设置buffered。
            c->buffered |= NGX_HTTP_WRITE_BUFFERED;

            return NGX_AGAIN;
        }

    } else if (clcf->sendfile_max_chunk) {
//sendfile所用到的limit。
        limit = clcf->sendfile_max_chunk;

    } else {
        limit = 0;
    }

    sent = c->sent;


然后接下来这段就是发送buf,以及发送完的处理部分。这里要注意send_chain返回值为还没有发送完的chain,这个函数我后面的blog会详细的分析的。


//调用发送函数。
chain = c->send_chain(c, r->out, limit);

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http write filter %p", chain);

    if (chain == NGX_CHAIN_ERROR) {
        c->error = 1;
        return NGX_ERROR;
    }

//控制imit_rate,这个值一般是在nginx.conf中配置的。
    if (r->limit_rate) {

        nsent = c->sent;

        if (clcf->limit_rate_after) {

            sent -= clcf->limit_rate_after;
            if (sent < 0) {
                sent = 0;
            }

            nsent -= clcf->limit_rate_after;
            if (nsent < 0) {
                nsent = 0;
            }
        }

        delay = (ngx_msec_t) ((nsent - sent) * 1000 / r->limit_rate + 1);

        if (delay > 0) {
            c->write->delayed = 1;
            ngx_add_timer(c->write, delay);
        }

    } else if (c->write->ready
               && clcf->sendfile_max_chunk
               && (size_t) (c->sent - sent)
                      >= clcf->sendfile_max_chunk - 2 * ngx_pagesize)
    {
        c->write->delayed = 1;
        ngx_add_timer(c->write, 1);
    }

//开始遍历上一次还没有传输完毕的chain,如果这次没有传完的里面还有的话,就跳出循环,否则free这个chain
    for (cl = r->out; cl && cl != chain; /* void */) {
        ln = cl;
        cl = cl->next;
        ngx_free_chain(r->pool, ln);
    }

///out chain赋值
    r->out = chain;

//如果chain存在,则设置buffered并且返回again。
    if (chain) {
        c->buffered |= NGX_HTTP_WRITE_BUFFERED;
        return NGX_AGAIN;
    }

//否则清理buffered
    c->buffered &= ~NGX_HTTP_WRITE_BUFFERED;

//如果有其他的filter buffered并且postponed被设置了,则我们返回again,也就是还有buf要处理。
    if ((c->buffered & NGX_LOWLEVEL_BUFFERED) && r->postponed == NULL) {
        return NGX_AGAIN;
    }

//否则返回ok
    return NGX_OK;



然后我们来看ngx_http_header_filter_module模块,这个模块的处理函数是ngx_http_header_filter。这个函数最终还是会调用ngx_http_write_filter来将head输出。

这个函数主要就是处理http的头域,然后设置对应的reponse值,最终输出。

这里header filter比较简单,这里没有什么复杂的东西,主要就是设置一些status。然后拷贝,最后通过ngx_http_write_filter进行发送。