欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Unix环境高级编程多路复用之Select的基本实现

程序员文章站 2022-06-14 10:59:42
...

select函数简介

select()函数允许进程指示内核等待多个事件(文件描述符)中的任何一个发生,并只在有一个或多个事件发生或经历一段指定时间后才唤醒它,然后接下来判断究竟是哪个文件描述符发生了事件并进行相应的处理。


#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
 
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);   
 
void FD_CLR(int fd, fd_set *set);  //用来删除一个已经没有使用的文件描述符fd
int  FD_ISSET(int fd, fd_set *set); //判断文件描述是否在集合中
void FD_SET(int fd, fd_set *set);   //将文件描述符fd加入的select的文件描述符集合
void FD_ZERO(fd_set *set);          //将select的文件描述符集合清空
 
/*可以用来设置select的超时时间*/
struct timeval { 
    long tv_sec;   //秒 
    long tv_usec;  //毫秒

参数说明

  1. select函数的返回值是就绪描述符的数目,超时时返回0,出错返回-1;
  2. 第一个参数max_fd指待测试的fd的总个数,它的值是待测试的最大文件描述符加1。Linux内核从0开始到max_fd-1扫描文件描述符,如果有数据出现事件(读、写、异常)将会返回;假设需要监测的文件描述符是8,9,10,那么Linux内核实际也要监测07,此时真正带测试的文件描述符是010总共11个,即max(8,9,10)+1,所以第一个参数是所有要监听的文件描述符中最大的+1。
  3. 中间三个参数readset、writeset和exceptset指定要让内核测试读、写和异常条件的fd集合,如果不需要测试的可以设置为NULL;
  4. 最后一个参数是设置select的超时时间,如果设置为NULL则永不超时;
    需要注意的是待测试的描述集总是从0, 1, 2, …开始的。 所以, 假如你要检测的描述符为8, 9, 10, 那么系统实际也要监测0, 1, 2, 3, 4, 5, 6, 7, 此时真正待测试的描述符的个数为11个, 也就是max(8, 9, 10) + 1
    在Linux内核有个参数__FD_SETSIZE定义了每个FD_SET的句柄个数中,这也意味着select所用到的FD_SET是有限的,也正是这个原因select()默认只能同时处理1024个客户端的连接请求:
/linux/posix_types.h:
 #define __FD_SETSIZE 1024

select的不足之处

1.select能监听的文件描述符个数受限于FD_SETSIZE,一般为1024,单纯改变进程打开的文件描述符个数并不能改变select监听文件个数,解决1024以下客户端时使用select是很合适的,但如果链接客户端过多,select采用的是轮询模型,会大大降低服务器响应效率,不应在select上投入更多精力。

2.每次调⽤用select,都需要把fd集合从⽤用户态拷贝到内核态,这个开销在fd很多时会很⼤大。

3.同时每次调⽤用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很⼤大。

select的多路复用实现网络socket的多并发服务器的流程图

Unix环境高级编程多路复用之Select的基本实现

代码示例

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <ctype.h>
#include <time.h>
#include <pthread.h>
#include <getopt.h>
#include <libgen.h>
#include <sys/types.h> 
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>



#define ARRAY_SIZE(x) (sizeof(x)/sizeof(x[0]))
#define  MSG_NUM  1024

void print_usage(char *progname)
{
        printf("%s usage: \n", progname);
        printf("-p(--LISTEN_PORT): sepcify server port.\n");
        printf("-h(--Help): print this help information.\n");
        return ;
}
static inline void msleep(unsigned long ms);


int main(int argc,char **argv)
{
      int    sockfd = -1;
      int    clifd ;
      int    rv  =-1;
      int    LISTEN_PORT ;
      int    rw;
      int    on =1;
      int    i,j;
      int    found;
      int    fds_array[1024];
      int    maxfd=0;
      fd_set rdset;
      int    f;
      char   buf[MSG_NUM];
      struct sockaddr_in     servaddr;
      struct sockaddr_in     cliaddr;
      pthread_t tid;
      socklen_t              cli_len=sizeof(struct sockaddr);  
      struct option longopts[] = {
          {"help", no_argument, NULL, 'h'},
          {"LISTEN_PORT", required_argument, NULL, 'P'},
          {0, 0, 0, 0}
          };
 
        while( (rw=getopt_long(argc, argv, "p:h", longopts, NULL)) != -1 )
         {
             switch(rw)
             {
                 case 'p':
                     LISTEN_PORT=atoi(optarg);
                     break;
                 case 'h':
                     print_usage(argv[0]);
                     return 0;       
             }   
         }
        if( !LISTEN_PORT )
        {
             print_usage(argv[0]);
             return 0;
         }
      sockfd=socket(AF_INET, SOCK_STREAM,0);
      if(sockfd<0)
      {
         printf("Create socket failure:%s\n",strerror(errno));
         return 0;
      }
      printf("Create socket[%d] successfully!\n",sockfd);
      setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
      memset(&servaddr, 0, sizeof(servaddr));
      servaddr.sin_family=AF_INET;
      servaddr.sin_port = htons(LISTEN_PORT);
      servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
      if(bind(sockfd,(struct sockaddr *)&servaddr,sizeof(servaddr))<0)
      {
         printf("Bind socket failure:%s\n ",strerror(errno));
         return -1;   
      }
      printf("socket[%d] bind on port[%d] for all IP address ok\n",sockfd,LISTEN_PORT);


      listen(sockfd,13);
      printf("Start to listen on port [%d]\n", LISTEN_PORT);

  for(i=0; i<ARRAY_SIZE(fds_array) ; i++)
      {
         fds_array[i]=-1;
      }
   fds_array[0] = sockfd;

   for ( ; ; )
   {
       FD_ZERO(&rdset);
       for(i=0; i<ARRAY_SIZE(fds_array) ; i++)
          {
             if( fds_array[i] < 0 )
             continue;
      
             maxfd = fds_array[i]>maxfd ? fds_array[i] : maxfd;
             FD_SET(fds_array[i], &rdset);
          }
      rv = select(maxfd+1, &rdset, NULL, NULL, NULL);
      if(rv < 0)
         {
            printf("select failure: %s\n", strerror(errno));
            break;
         }
      else if(rv == 0)
         {
            printf("select get timeout\n");
            continue;
         }
       if ( FD_ISSET(sockfd, &rdset) )
         {
             printf("Start accept new client incoming...\n");
             if( (clifd=accept(sockfd, (struct sockaddr *)NULL, NULL)) < 0)
             {
                 printf("accept new client failure: %s\n", strerror(errno));
                 continue;
             }
          found = 0;
          for(i=0; i<ARRAY_SIZE(fds_array) ; i++)
          {
               if( fds_array[i] < 0 )
               {
                  printf("accept new client[%d] and add it into array\n", clifd );
                  fds_array[i] = clifd;
                  found = 1;
                  break;
                }
          }
                if( !found )
                {
                    printf("accept new client[%d] but full, so refuse it\n", clifd);
                    close(clifd);
                }
        
        }
       else
       {
           for(i=0; i<ARRAY_SIZE(fds_array); i++)
              {
                  if( fds_array[i]<0 || !FD_ISSET(fds_array[i], &rdset) )
                  continue;
                  if( (rv=read(fds_array[i], buf, sizeof(buf))) <= 0)
                  {
                      printf("socket[%d] read failure or get disconncet.\n", fds_array[i]);
                      close(fds_array[i]);
                      fds_array[i] = -1;
                  }
                  else
                  {
                     printf("socket[%d] read get %d bytes data\n", fds_array[i], rv);
                     for(j=0; j<rv; j++)
                     buf[j]=toupper(buf[j]);
                     if( write(fds_array[i], buf, rv) < 0 )
                     {
                        printf("socket[%d] write failure: %s\n", fds_array[i], strerror(errno));
                        close(fds_array[i]);
                        fds_array[i] = -1;
                     }
                   }
              }
       }  
   }
    close(sockfd);
    return 0;
}
static inline void msleep(unsigned long ms)
{
    struct timeval tv;
    tv.tv_sec = ms/1000;
    tv.tv_usec = (ms%1000)*1000;
    select(0, NULL, NULL, NULL, &tv);
}