测试Lighttpd accept的惊群现象
程序员文章站
2024-01-12 18:19:16
...
lighttpd里面采用的是prefork的模型,在fork进程之前就已经创建好了listen socket
那么fork了进程池之后,所有进程都有一份自己独立的listen socket fd,
但实际上这个独立的fd 对应的确是一个文件表项,即实际上任然是一个共享的文件描述符
在阻塞模型中,各进程分别通过accept阻塞,等待连接到达,当一个连接到达时,所有的进程都会被唤醒,但只有其中一个进程可以成功accept该连接,其余的则继续投入睡眠,这就是所谓的惊群现象
lighttpd使用的是非阻塞IO复用模型,测试一下是否会有惊群现象呢?
先把结论给出:
1.比如有20个进程注册了listen socket的请求连接事件,当一个连接到达确实会有多个进程 被通知有事件要处理(但不是全部,大约只有5,6个进程)
2.被唤醒的这几个进程会调用accept函数,其中只有一个成功返回连接fd,其余进程均返回EAGAIN或者 EWOULDBLOCK错误(因为是非阻塞的)
测试方法,自己写了一个prefork进程 + epoll的非阻塞server,启动20个进程,client telnet,打印服务器日志
try to accept new connection,pid=29879
try to accept new connection,pid=29876
try to accept new connection,pid=29880
process 29879 accept connection
accept EAGAIN error pid=29876
try to accept new connection,pid=29875
accept EAGAIN error pid=29880
accept EAGAIN error pid=29875
四个进程被通知有事件处理,1个成功accept,3个返回EAGAIN
、
在lighttpd中,server当被通知有连接要处理时,server会通过循环执行
accept,直到返回错误,或者超过一个上限值
这样,当海量请求连接到达时,似乎惊群不会带来太多的性能损耗。
部分测试代码
base.h
server.c
那么fork了进程池之后,所有进程都有一份自己独立的listen socket fd,
但实际上这个独立的fd 对应的确是一个文件表项,即实际上任然是一个共享的文件描述符
在阻塞模型中,各进程分别通过accept阻塞,等待连接到达,当一个连接到达时,所有的进程都会被唤醒,但只有其中一个进程可以成功accept该连接,其余的则继续投入睡眠,这就是所谓的惊群现象
lighttpd使用的是非阻塞IO复用模型,测试一下是否会有惊群现象呢?
先把结论给出:
1.比如有20个进程注册了listen socket的请求连接事件,当一个连接到达确实会有多个进程 被通知有事件要处理(但不是全部,大约只有5,6个进程)
2.被唤醒的这几个进程会调用accept函数,其中只有一个成功返回连接fd,其余进程均返回EAGAIN或者 EWOULDBLOCK错误(因为是非阻塞的)
测试方法,自己写了一个prefork进程 + epoll的非阻塞server,启动20个进程,client telnet,打印服务器日志
try to accept new connection,pid=29879
try to accept new connection,pid=29876
try to accept new connection,pid=29880
process 29879 accept connection
accept EAGAIN error pid=29876
try to accept new connection,pid=29875
accept EAGAIN error pid=29880
accept EAGAIN error pid=29875
四个进程被通知有事件处理,1个成功accept,3个返回EAGAIN
、
在lighttpd中,server当被通知有连接要处理时,server会通过循环执行
accept,直到返回错误,或者超过一个上限值
这样,当海量请求连接到达时,似乎惊群不会带来太多的性能损耗。
部分测试代码
base.h
enum conn_states { conn_listening, /** the socket which listens for connections */ conn_read, /** reading in a command line */ conn_write, /** writing out a simple response */ conn_nread, /** reading in a fixed number of bytes */ conn_swallow, /** swallowing unnecessary bytes w/o storing */ conn_closing, /** closing this connection */ conn_mwrite, /** writing out many items sequentially */ }; typedef struct{ int fd; int state; }conn;
server.c
#include<stdio.h> #include<unistd.h> #include<sys/types.h> #include<stdlib.h> #include<sys/socket.h> #include<errno.h> #include<fcntl.h> #include<netinet/in.h> #include<sys/resource.h> #include "event.h" #include "base.h" //forward declaration static fdevents *ev; static conn **conns; static int freetotal; static int freecurr; static int create_listen_fd(char *addr,int port); static int conn_init(); static conn *get_conn_from_freelist(); static int add_conn_to_freelist(conn *c); conn *conn_new(int fd,int state); static int conn_init(){ freetotal=200; freecurr=0; conns=(conn **)malloc(freetotal * sizeof(*conns)); if(!conns){ return -1; } return 0; } static conn *get_conn_from_freelist(){ conn *con; if(freecurr > 0){ con=conns[--freecurr]; conns[freecurr]=NULL; return con; } return NULL; } static int add_conn_to_freelist(conn *c){ if(freecurr<freetotal){ conns[freecurr++]=c; return 0; }else{ conn **new_conns=(conn **)realloc(conns,sizeof(*new_conns)*2*freetotal); if(new_conns){ freetotal*=2; conns=new_conns; conns[freecurr++]=c; return 0; } } return -1; } conn *conn_new(int fd,int state){ conn *c; c=get_conn_from_freelist(); if(!c){ c=(conn *)malloc(sizeof(*c)); } c->fd=fd; c->state=state; return c; } static int create_listen_fd(char *addr,int port){ int fd,val,flags; struct sockaddr_in sockaddr; fd=socket(AF_INET,SOCK_STREAM,0); if(fd==-1){ fprintf(stderr,"socket()\n"); return -1; } val=1; if(setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&val,sizeof(val))<0){ fprintf(stderr,"reuseaddr\n"); return -1; } if((flags=fcntl(fd,F_GETFL,0)<0) || fcntl(fd,F_SETFL,flags | O_NONBLOCK) < 0){ fprintf(stderr,"nonblocking\n"); return -1; } bzero(&sockaddr,sizeof(sockaddr)); sockaddr.sin_family=AF_INET; sockaddr.sin_port=htons(port); inet_pton(AF_INET,addr,&sockaddr.sin_addr); if(bind(fd,(struct sockaddr *)&sockaddr,sizeof(sockaddr))<0){ fprintf(stderr,"bind error %s",strerror(errno)); return -1; } if(listen(fd,2048)<0){ fprintf(stderr,"listen %s",strerror(errno)); return -1; } return fd; } void event_handler(int fd,void *ctx, int revents){ struct sockaddr_in addr; socklen_t sock_len; int done=0,connfd; conn *c; c=(conn *)ctx; while(!done){ switch(c->state){ case conn_listening: printf("try to accept new connection,pid=%d\n",getpid()); sock_len=sizeof(addr); connfd=accept(fd,(struct sockaddr *)&addr,&sock_len); if(connfd>0){ printf("process %d accept connection\n",getpid()); c = conn_new(connfd,conn_read); fdevent_register(ev,connfd,event_handler,c); fdevent_event_add(ev,connfd,FDEVENT_IN); }else{ if(errno== EAGAIN || errno == EWOULDBLOCK){ printf("accept EAGAIN error pid=%d\n",getpid()); } if(errno==EINTR){ printf("accept EINTR error pid=%d\n",getpid()); } if(errno==ECONNABORTED){ /* this is a FreeBSD thingy */ printf("accept EABORTED error pid=%d\n",getpid()); } if(errno==EMFILE){ printf("accept EMFILE error pid=%d\n",getpid()); } } done=1; break; case conn_read: printf("on read"); break; } } } int main(int argc,char **argv){ int fd,o; char *listen_addr; int port,num_childs,max_fds; struct rlimit rlim; conn *c; port=0; num_childs=5; while(-1!=(o=getopt(argc,argv,"l:p:f:h"))){ switch(o){ case 'l': listen_addr=strdup(optarg); break; case 'p': port=atoi(optarg); break; case 'f': num_childs=atoi(optarg); break; case 'h': printf("Usage -l listen addr\n"); printf("Usage -p listen port \n"); printf("Usage -f fork num\n"); exit(1); } } if(!listen_addr){ listen_addr=strdup("127.0.0.1"); } if(!port){ printf("port is unknown\n"); exit(1); } if(0 != getrlimit(RLIMIT_NOFILE,&rlim)){ fprintf(stderr,"getrlimit failed.reason %s\n",strerror(errno)); exit(1); } max_fds=rlim.rlim_cur; //create listen socket if(-1==(fd=create_listen_fd(listen_addr,port))){ fprintf(stderr,"create listen fd failed\n"); exit(1); } //prefork child if(num_childs > 0){ int child=0; while(!child){ if(num_childs >0){ switch(fork()){ case -1: return -1; case 0: child=1; break; default: num_childs--; break; } }else{ int status; if(-1 !=wait(&status)){ num_childs++; }else{ //ignore } } } } //child process event conn_init(); c=conn_new(fd,conn_listening); ev=fdevent_init(max_fds); if(!ev){ fprintf(stderr,"fdevent_init()\n"); exit(1); } fdevent_register(ev,fd,event_handler,c); fdevent_event_add(ev,fd,FDEVENT_IN); fdevent_poll(ev,1000); }
上一篇: Spring AOP 详解
下一篇: Memcached源码分析(线程模型)