多路转接IO/多路复用 select 、poll、 epoll
多路转接IO
1.多路复用--》多路转接
2.作用: I0多路转接可以完成大量描述符的监控,监控的事件:可读事件,可写事件,异常事件
3.当使用多路转接的时候,多路转接接口发现了某一一个文件描述符就绪的时候,就会通知进程,让进程针对某一个描述符进行操作;其他描述符继续监控;
4.好处:避免了进程对其他没有就绪的文件描述符进行操作,从而陷入阻塞的情况
select:
1.作用:对于大量的描述符进行用户关心事件的监控,如果有描述符就绪,则返回描述符,让用户对描述符进行操作;1.1将用户关系的描述符拷贝到内核,内核帮助用户进行监控
1.2如果内核监控到某个文件描述符事件就绪,则返回该描述符
1.3用户针对描述符进行操作
2.接口
int select(int nfds, fd_ set* readfds, fd_ set* writefds, fd_ set* exceptfds, strcut timeval* timeout);
nfds :取值为监控的最大描述符数值+1 作用: 提高selcet的监控效率
fd_ set :本质是一一个结构体,结构体内部是一个数组fds_ bits :__fd_ mask fds_ bits[__ FD_ SETSIZE /_ NFDBITS];
比特位的大小取决于宏_ FD_ SETSIZE
fd_ set 的使用方式是按照位图的方式来进行使用的
提供4个操作fd_ set位图的函数:
void FD_ _CLR(int fd, fd_ set *set); // 从事件集合当中删除某一个文件描述符
int FD_ ISSET(int fd, fd_ set *set);//判断fd描述符,是否在set集合当中 0 :表示没有在集合当中 非0 :表示在集合当中
void FD_ SET(int fd, fd_ _set *set); //设置fd ,到集合set当中
void FD_ ZERO(fd_ set *set); // 清空
timeout :超时时间
timeval == NULL :阻塞监控
timeval==0(tv_sec==0 tv__usec==0)非阻塞监控
timeval > 0 :带有超时时间的监控
返回值:
大于0 :则返回就绪的文件描述符的个数
等于0 :等待超时了
小于0 :监控出错
select返回的时候,会将没有就绪的文件描述符从集合当中去除掉;只返回就绪的文件描述符
例子:selectTCP
/*
1.创建侦听套接字
2.绑定地址信息
3.开始监听
4.将侦听套接字添加到select的可读事件集合当中
5. select监听
6.判断是侦听套接字来就绪,还是新的套接字就绪
6.1有新连接到来,select返回掉了
accept -->》新的套接字描述符-- 》添加到可读事件集合当中去
6.2有数据到来;recv接收数据
*/
//TcpSvr.hpp
#pragma once
#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <iostream>
#include <string>
class TcpSvr
{
public:
TcpSvr()
{
sockfd_ = -1;
}
~TcpSvr()
{
}
//创建套接字
bool CreateSocket()
{
sockfd_ = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if(sockfd_ < 0)
{
perror("socket");
return false;
}
int i = 1;
setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(int));
return true;
}
//绑定地址信息
bool Bind(std::string& ip, uint16_t port)
{
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
int ret = bind(sockfd_, (struct sockaddr*)&addr, sizeof(addr));
if(ret < 0)
{
perror("bind");
return false;
}
return true;
}
//侦听
bool Listen(int backlog = 5)
{
int ret = listen(sockfd_, backlog);
if(ret < 0)
{
perror("listen");
return false;
}
return true;
}
//获取连接
//bool Accept(struct sockaddr_in* peeraddr, int* sockfd)
//peeraddr:出参,返回客户端的地址信息
//ts:出参,返回一个TcpSvr类的实例化指针,在这个类的实例化指针当中保存新创建出来的套接字描述符,上层调用者可以使用返回的类的实例化指针和客户端进行通信
bool Accept(struct sockaddr_in* peeraddr, TcpSvr* ts)
{
socklen_t addrlen = sizeof(struct sockaddr_in);
int serverfd = accept(sockfd_, (struct sockaddr*)peeraddr, &addrlen);
if(serverfd < 0)
{
perror("accept");
return false;
}
ts->sockfd_ = serverfd;
return true;
}
//发起连接(client)
bool Connect(std::string& ip, uint16_t port)
{
struct sockaddr_in destaddr;
destaddr.sin_family = AF_INET;
destaddr.sin_port = htons(port);
destaddr.sin_addr.s_addr = inet_addr(ip.c_str());
int ret = connect(sockfd_, (struct sockaddr*)&destaddr, sizeof(destaddr));
if(ret < 0)
{
perror("connect");
return false;
}
return true;
}
//发送数据
bool Send(std::string& data)
{
int sendsize = send(sockfd_, data.c_str(), data.size(), 0);
if(sendsize < 0)
{
perror("send");
return false;
}
return true;
}
//接收数据
//data:是一个出参,将接收到的数据返回给调用者
bool Recv(std::string* data)
{
char buf[1024] = {0};
int recvsize = recv(sockfd_, buf, sizeof(buf) - 1, 0);
if(recvsize < 0)
{
perror("recv");
return false;
}
else if(recvsize == 0)
{
printf("peer shutdown connect\n");
return false;
}
(*data).assign(buf, recvsize);
return true;
}
//关闭套接字
void Close()
{
close(sockfd_);
sockfd_ = -1;
}
void SetFd(int fd)
{
sockfd_ = fd;
}
int Getfd()
{
return sockfd_;
}
private:
int sockfd_;
};
//selectsvr.hpp
#pragma once
#include <stdio.h>
#include <unistd.h>
#include <sys/select.h>
#include <vector>
#include "tcpsvr.hpp"
class SelectSvr
{
public:
SelectSvr()
{
max_fd_ = -1;
FD_ZERO(&readfds_);
}
~SelectSvr()
{
}
void AddFd(int fd)
{
//1.添加
FD_SET(fd, &readfds_);
//2.更新最大文件描述符
if(fd > max_fd_)
{
max_fd_ = fd;
}
}
void DeleteFd(int fd)
{
//1.删除
FD_CLR(fd, &readfds_);
//2.更新最大文件描述符
for(int i = max_fd_; i >= 0; i--)
{
if(FD_ISSET(i, &readfds_) == 0) // 不在
continue;
max_fd_ = i;
break;
}
}
bool SelectWait(std::vector<TcpSvr>* vec)
{
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 3000;
fd_set tmp = readfds_;
int ret = select(max_fd_ + 1, &tmp, NULL, NULL, &tv);
if(ret < 0)
{
perror("select");
return false;
}
else if(ret == 0)
{
printf("select timeout\n");
return false;
}
//正常情况
for(int i = 0; i < max_fd_; i++)
{
if(FD_ISSET(i, &tmp))
{
//返回就绪的文件描述符 i 返回tcp类的对象
TcpSvr ts;
ts.SetFd(i);
vec->push_back(ts);
}
}
return true;
}
private:
int max_fd_;
fd_set readfds_;
};
//test.cpp
#include "SelectSvr.hpp"
#define CHECK_RET(p) if(p != true){return -1;}
int main()
{
TcpSvr listen_ts;
CHECK_RET(listen_ts.CreateSocket());
CHECK_RET(listen_ts.Bind("0.0.0.0", 19997));
CHECK_RET(listen_ts.Listen());
SelectSvr ss;
ss.AddFd(listen_ts.Getfd());
while(1)
{
//1.监控
std::vector<TcpSvr> vec;
if(!ss.SelectWait(&vec))
{
continue;
}
for(size_t i = 0; i < vec.size(); i++)
{
//2.接收新的连接
if(listen_ts.Getfd() == vec[i].Getfd())
{
struct sockaddr_in peeraddr;
TcpSvr peerts;
listen_ts.Accept(&peeraddr, &peerts);
printf("Have a new connection : [ip] : %s [port] : %d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
//将新创建出来的套接字添加到select的事件集合当中去
ss.AddFd(peerts.Getfd());
}
//3.连接上有数据
else
{
std::string data;
bool ret = vec[i].Recv(&data);
if(!ret)
{
ss.DeleteFd(vec[i].Getfd());
vec[i].Close();
}
printf("client send data is [%s]\n", data.c_str());
}
}
}
return 0;
}
select优缺点
优点:
1.select遵循的是posix标准,可以跨平台移植
2.select的超时时间可以精确到微妙
缺点:
1.select是轮询遍历的,监控的效率会随着文件描述符的增多而下降
2.select所能监控的文件描述符是有上限的,上限为1024, 取决于内核当中的_ FD_ SETSIZE宏的值
3.select监控文件描述符的时候,需要将集合拷贝到内核当中,监控到文件描述符上有事件就绪的时候,同样需要从内核当中拷贝到用户空间,效率会受到影响
4.select在返回就绪文件描述符的时候,会将未就绪的文件描述符移除掉,导致第:二次在去监控的时候,需要重新添加5.select没有直接告诉程序员哪一个文件描述符就绪了,需要程序员自己在返回的事件集合当中去判断
多路转接之poll
1.跨平台移植性不如select, poll只能在linux环境下使用,也是采用轮询遍历的方式,所以效率也没有明显的增加
2.相较于select改进的点
2.1不限制监控的文件描述符个数了
2.2文件描述符对应一个事件结构,告诉poll两件事情
2.2.1要监控的文件描述符
2.2.2关心文件描述符的事件
3.接口
int poll(struct pollfd *fds, nfds_ t nfds, int timeout);
fds:事件结构数组
struct pollfd {
int fd; /* file descriptor */关心的文件描述符是什么
short events; /* requested events */关心文件描述符产生什么事件
POLLIN :可读事件
POLLOUT :可写事件
文件描述符既关心可读事件,也关可写事件,应该将两者进行“按位或”连接,采用的是的位图方式。
eg: events = POLLIN | POLLOUT
short revents; /* returned events */当关心的文件描述符产生对应关心的事件时:将发生的时间放
到revents当中返回给调用者; revents在每次poll监控之初,就会被初始化为空
};
返回值:
小于0 : poll出错了
等于0 :监控超时
大于0 :就绪的文件描述符个数
#include <stdio.h>
#include <unistd.h>
#include <poll.h>
int main()
{
//1.定义事件结构数组
struct pollfd fd_arr[10];
fd_arr[0].fd = 0;
fd_arr[0].events = POLLIN;
//2.监控
while(1)
{
int ret = poll(fd_arr, 1, 1000);
if(ret < 0)
{
perror("poll");
return -1;
}
else if(ret == 0)
{
printf("timeout\n");
continue;
}
// 2.1 当产生可读事件的时候,处理事件
//fd_arr[10] fd_arr[0] fd_arr[1]
for(int i = 0; i < ret; i++)
{
if(fd_arr[i].revents == POLLIN)
{
char buf[1024] = {0};
read(fd_arr[i].fd, buf, sizeof(buf) - 1);
printf("read content is = %s\n", buf);
}
}
}
return 0;
}
poll优缺点:
优点:
pol采用了事件结构的方式.简化了代码的编写
poll不限制文件描述符的个数
也不需要重新添加文件描述符到事件结构数组当中了
缺点:
poll也是需要轮询遍历事件结构数组,随着文件描述符的增多,而性能下降
poll不支持跨平台
poll也没有告诉用户哪一个具体的文件描述符就绪了,需要程序员进行遍历
poll也是需要将时间结构数组从用户空间拷贝到内核,从内核拷贝到用户空间
多路转接之epoll
接口
1.创建epoll操作句柄
int epoll_ create(int size)
size :本来的含义是定义epoll最大能够监控的文件描述符个数
linux内核版本2.6.8之后,该参数size就已经被弃用了,内存现在采用的是扩容的方式size是不可以传入负数的!
从内核角度分析:在内核当中创建一个结构体, struct eventpoll结构体--》红黑树,双向链表返回值:返回epoll操作句柄,说白了,就是操作struct eventpoll结构体的的钥匙
2.操作epoll
int epoll_ ctl(int epfd, int op, int fd, struct epoll_ event *event);
epfd : epoll操作句柄
op(option) :想让epoll _ct|函数做什么事情
EPOLL _CTL. _ADD :添加一个文件描述符对应的事件结构到红黑树当中
EPOLL_ CTL _MOD :修改-个已经在红黑树当中的事件结构
EPOLL_ CTL. _DEL :从epoll的红黑树当中删除一个文件描述符对应的事件结构
fd :告诉epoll用户关心的文件描述符
event :类型是struct epoll_ event结构体,也是epoll的事件结构
struct epoll_ event {
uint32_ tevents; /* Epoll events */用户对文件描述符所关心的事件
EPOLLIN :可读事件
EPOLLOUT :可写事件
epoll_ data. t data; /* User data variable */
}
3.监控
int epoll _wait(int epfd, struct epoll_ event *events, int maxevents, int timeout);
epfd : epoll操作句柄
events : epoll事件结构数组--》出参,返回就绪的事件结构(每一个事件结构都对应-个文件描述符)maxevents :最大能够拷贝多少个事件结构
timeout:
大于0 :带有超时时间,单位为毫秒
等于0 :非阻塞
小于0 :阻塞
返回值:
大于0 :返回就绪的文件描述符个数
等于0 :等待超时
小于0 :监控出错
示例epollTCP
//tcpsvr.hpp
/*
TCP接口
1创建套接字
2绑定地址信息
3监听
4获取新连接
5接收数据
6发送数据
7关闭套C接字
*/
#pragma once
#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <iostream>
#include <string>
class TcpSvr
{
public:
TcpSvr()
{
sockfd_ = -1;
}
~TcpSvr()
{
}
//创建套接字
bool CreateSocket()
{
sockfd_ = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if(sockfd_ < 0)
{
perror("socket");
return false;
}
int i = 1;
setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(int));
return true;
}
//绑定地址信息
bool Bind(const std::string& ip, uint16_t port)
{
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
int ret = bind(sockfd_, (struct sockaddr*)&addr, sizeof(addr));
if(ret < 0)
{
perror("bind");
return false;
}
return true;
}
//侦听
bool Listen(int backlog = 5)
{
int ret = listen(sockfd_, backlog);
if(ret < 0)
{
perror("listen");
return false;
}
return true;
}
//获取连接
//bool Accept(struct sockaddr_in* peeraddr, int* sockfd)
//peeraddr:出参,返回客户端的地址信息
//ts:出参,返回一个TcpSvr类的实例化指针,在这个类的实例化指针当中保存新创建出来的套接字描述符,上层调用者可以使用返回的类的实例化指针和客户端进行通信
bool Accept(struct sockaddr_in* peeraddr, TcpSvr* ts)
{
socklen_t addrlen = sizeof(struct sockaddr_in);
int serverfd = accept(sockfd_, (struct sockaddr*)peeraddr, &addrlen);
if(serverfd < 0)
{
perror("accept");
return false;
}
ts->sockfd_ = serverfd;
return true;
}
//发起连接(client)
bool Connect(std::string& ip, uint16_t port)
{
struct sockaddr_in destaddr;
destaddr.sin_family = AF_INET;
destaddr.sin_port = htons(port);
destaddr.sin_addr.s_addr = inet_addr(ip.c_str());
int ret = connect(sockfd_, (struct sockaddr*)&destaddr, sizeof(destaddr));
if(ret < 0)
{
perror("connect");
return false;
}
return true;
}
//发送数据
bool Send(std::string& data)
{
int sendsize = send(sockfd_, data.c_str(), data.size(), 0);
if(sendsize < 0)
{
perror("send");
return false;
}
return true;
}
//接收数据
//data:是一个出参,将接收到的数据返回给调用者
bool Recv(std::string* data)
{
char buf[1024] = {0};
int recvsize = recv(sockfd_, buf, sizeof(buf) - 1, 0);
if(recvsize < 0)
{
perror("recv");
return false;
}
else if(recvsize == 0)
{
printf("peer shutdown connect\n");
return false;
}
(*data).assign(buf, recvsize);
return true;
}
//关闭套接字
void Close()
{
close(sockfd_);
sockfd_ = -1;
}
void SetFd(int fd)
{
sockfd_ = fd;
}
int Getfd()
{
return sockfd_;
}
private:
int sockfd_;
};
//epollsvr.hpp
/*
epoll的封装epoll默认是水平触发模式 LTEpoll_ Svr
创建epoll操作句柄(放到构造函数当中去)epoll create
添加对应的事件结构到epoll当中
epoll_ ctl
从epol当中删除对应的事件结构
epoll_ ctl
监控
epoll_ _wait
成员变量:
int epollfd
*/
#pragma once
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <vector>
#include "tcpsvr.hpp"
class EpollSvr
{
public:
EpollSvr()
{
epoll_fd_ = -1;
}
~EpollSvr()
{
}
bool InitSvr()
{
epoll_fd_ = epoll_create(10);
if(epoll_fd_ < 0)
{
return false;
}
return true;
}
bool AddFd(int fd)
{
//1.组织事件结构
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = fd;
int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
if(ret < 0)
{
perror("epoll_ctl");
return false;
}
return true;
}
bool DeleteFd(int fd)
{
int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, NULL);
if(ret < 0)
{
perror("epoll_ctl");
return false;
}
return true;
}
bool EpollWait(std::vector<TcpSvr>* out)
{
//10
struct epoll_event fd_arr[10];
int ret = epoll_wait(epoll_fd_, fd_arr, sizeof(fd_arr)/sizeof(fd_arr[0]), -1);
if(ret < 0)
{
perror("epoll_wait");
return false;
}
else if(ret == 0)
{
printf("epollwait timeout\n");
return false;
}
//防止数组越界
if(ret > sizeof(fd_arr)/sizeof(fd_arr[0]))
{
ret = sizeof(fd_arr)/sizeof(fd_arr[0]);
}
//剩下的逻辑都是ret大于0的逻辑了
for(int i = 0; i < ret; i++)
{
TcpSvr ts;
ts.SetFd(fd_arr[i].data.fd);
out->push_back(ts);
}
return true;
}
private:
//epoll操作句柄
int epoll_fd_;
};
//test.cpp
/*
1.创建侦听套接字
2.绑定地址信息
3.监听
4.将侦听套接字添加到epoll当中
5.监控 前提:监控的文件描述符产生了就绪事件
6.1获取新连接,将新连接的套接字添加到epoll当中
6.2新连接上有数据到来,获取数据
*/
#include "epoll_lt_svr.hpp"
#define CHECK_RET(p) if(p != true){return -1;}
int main()
{
TcpSvr listen_ts;
CHECK_RET(listen_ts.CreateSocket());
CHECK_RET(listen_ts.Bind("0.0.0.0", 19997));
CHECK_RET(listen_ts.Listen());
EpollSvr es;
CHECK_RET(es.InitSvr());
es.AddFd(listen_ts.Getfd());
while(1)
{
//1.监控
std::vector<TcpSvr> vec;
if(!es.EpollWait(&vec))
{
continue;
}
for(size_t i = 0; i < vec.size(); i++)
{
//2.接收新的连接
if(listen_ts.Getfd() == vec[i].Getfd())
{
struct sockaddr_in peeraddr;
TcpSvr peerts;
listen_ts.Accept(&peeraddr, &peerts);
printf("Have a new connection : [ip] : %s [port] : %d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
//将新创建出来的套接字添加到select的事件集合当中去
es.AddFd(peerts.Getfd());
}
//3.连接上有数据
else
{
std::string data;
bool ret = vec[i].Recv(&data);
if(!ret)
{
es.DeleteFd(vec[i].Getfd());
vec[i].Close();
}
printf("client send data is [%s]\n", data.c_str());
}
}
}
return 0;
}
epoll对描述符就绪事件的触发方式:
水平触发
EPOLLLT --> epoll默认工作方式,select和poll都是水平触发
可读事件:只要接收缓冲区当中的数据大于低水位标记( 1字节),就会一直触发可读事件就绪,直到接收缓冲区当中没有数据可读
可写事件:只要发送缓冲区当中的数据空间大小大于低水位标记( 1字节) , 就会-直触发可写事件就绪,直到发送缓冲区当中没有空间可写.
边缘触发(边沿触发)
EPOLLET --》只有epoll才支持
使用方式:只需要在文件描述符对应的事件结构当中的关心的事件按位或上EPOLLET就可以了
struct epoll_ event ev;
ev.events = EPOLLIN | EPOLLET;
可读事件:只有当新数据到来的时候,才会触发可读
可写事件:只有发送缓冲区剩余空间从不可写变成可写才会触发一次可写事件就绪
示例epoll_et
#include <stdio.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <string>
int main()
{
//设置文件描述符属性为非阻塞
int flag = fcntl(0, F_GETFL);
fcntl(0, F_SETFL, flag | O_NONBLOCK);
//1.创建epoll操作句柄
int epoll_fd = epoll_create(10);
if(epoll_fd < 0)
{
perror("epoll_create");
return 0;
}
//2.添加文件描述符
struct epoll_event ev;
//events 使用方式是按照位图的使用方式来使用的
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = 0;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, 0, &ev);
//3.监听
while(1)
{
struct epoll_event fd_arr[10];
int ret = epoll_wait(epoll_fd, fd_arr, sizeof(fd_arr)/sizeof(fd_arr[0]), -1);
if(ret < 0)
{
perror("epoll_wait");
//return 0;
continue;
}
for(int i = 0; i < ret; i++)
{
if(fd_arr[i].data.fd == 0)
{
std::string read_data;
while(1)
{
char buf[3] = {0};
ssize_t readsize = read(0, buf, sizeof(buf) - 1);
if(readsize < 0)
{
if(errno == EAGAIN || errno == EWOULDBLOCK)
{
//说明数据读完了
goto myend;
}
perror("read");
return 0;
}
read_data += buf;
if(readsize < (ssize_t)sizeof(buf) - 1)
{
myend:
printf("read buf content is %s\n", read_data.c_str());
break;
}
}
}
}
}
//4.处理就绪事件
return 0;
}
按照读到数据的有效字节来判断
如果判断read或者recv的返回值比我们准备的buf的最大接收能力还小,说明读完了当我们读到的字节数量和buf的最大接收能力相等,无法判断后面是否还有数据的
ET加上循环,会造成饥饿状态!
解决方案:将文件描述符设置成为非阻塞状态
int fcntl(int fd, int cmd, ../* arg */ );
fd :操作哪-个文件描述符
cmd :告诉fcnt|函数做什么事情
F_ GETFL:获取文件描述符属性
F_ SETFD:设置文件描述符属性
上一篇: 梅子绿茶做法这里有