reactor模式C++实现
程序员文章站
2022-04-12 21:30:40
copy from github上的一份实现。。。找不到链接了。。。
* epoll主要负责fd到event类型的映射
* EventDemultiplexer管理fd <...
copy from github上的一份实现。。。找不到链接了。。。
* epoll主要负责fd到event类型的映射
* EventDemultiplexer管理fd <-> event类型 <-> eventhandler具体怎么做event的回调方法,从而间接实现fd <–event类型–> eventhandler
的具体回调方法方法
* Reactor负责注册、管理、分配
核心代码
reactor.h#include "event_handler.h" #include "event.h" #include "reactor_impl.h" class ReactorImpl; // 为了隐藏具体实现么。。 class Reactor { public: static Reactor& get_instance(); int regist(EventHandler* handler, Event evt); void remove(EventHandler* handler); void dispatch(int timeout = 0); private: Reactor(); ~Reactor(); Reactor(const Reactor&); Reactor& operator=(const Reactor&); private: ReactorImpl* _reactor_impl; static Reactor reactor; };reactor.cpp
#include "reactor.h" #includereactor_impl.h#include Reactor Reactor::reactor; Reactor& Reactor::get_instance() { return reactor; } Reactor::Reactor() { _reactor_impl = new (std::nothrow)ReactorImpl(); assert(_reactor_impl != NULL); } Reactor::~Reactor() { if (_reactor_impl != NULL) { delete _reactor_impl; _reactor_impl = NULL; } } int Reactor::regist(EventHandler* handler, Event evt) { return _reactor_impl->regist(handler, evt); } void Reactor::remove(EventHandler* handler) { return _reactor_impl->remove(handler); } void Reactor::dispatch(int timeout) { return _reactor_impl->dispatch(timeout); }
#includereactor_impl.cpp
#include "reactor_impl.h" #includeevent.h#include #include "epoll_demultiplexer.h" ReactorImpl::ReactorImpl() { _demultiplexer = new (std::nothrow)EpollDemultiplexer(); assert(_demultiplexer != NULL); } ReactorImpl::~ReactorImpl() { std::map ::iterator iter = _handlers.begin(); for(; iter != _handlers.end(); ++iter) { delete iter->second; } if (_demultiplexer != NULL) { delete _demultiplexer; } } int ReactorImpl::regist(EventHandler* handler, Event evt) { int handle = handler->get_handle(); if (_handlers.find(handle) == _handlers.end()) { _handlers.insert(std::make_pair(handle, handler)); } return _demultiplexer->regist(handle, evt); } void ReactorImpl::remove(EventHandler* handler) { int handle = handler->get_handle(); // not check? _demultiplexer->remove(handle); std::map ,>::iterator iter = _handlers.find(handle); delete iter->second; _handlers.erase(iter); } void ReactorImpl::dispatch(int timeout) { _demultiplexer->wait_event(_handlers, timeout); } ,>
typedef unsigned int Event; enum EventMask { ReadEvent = 0x01, WriteEvent = 0x02, ErrorEvent = 0x04, EventMask = 0xff };event_demultiplexer.h
#includeepoll_demultiplexer.h
#includeepoll_demultiplexer.cpp
#includeevent_handler.h#include #include #include #include #include "epoll_demultiplexer.h" EpollDemultiplexer::EpollDemultiplexer() : _max_fd(0) { _epoll_fd = epoll_create(1024); } EpollDemultiplexer::~EpollDemultiplexer() { close(_epoll_fd); } int EpollDemultiplexer::wait_event(std::map & handlers, int timeout) { std::vector ,>events(_max_fd); int num = epoll_wait(_epoll_fd, &events[0], _max_fd, timeout); if (num < 0) { //std::cerr << "WARNING: epoll_wait error " << errno << std::endl; return num; } for (int i = 0; i < num; ++i) { Handle handle = events[i].data.fd; if ((EPOLLHUP|EPOLLERR) & events[i].events) { assert(handlers[handle] != NULL); (handlers[handle])->handle_error(); } else { if ((EPOLLIN) & events[i].events) { assert(handlers[handle] != NULL); (handlers[handle])->handle_read(); } if (EPOLLOUT & events[i].events) { (handlers[handle])->handle_write(); } } } return num; } int EpollDemultiplexer::regist(Handle handle, Event evt) { struct epoll_event ev; ev.data.fd = handle; if (evt & ReadEvent) { ev.events |= EPOLLIN; } if (evt & WriteEvent) { ev.events |= EPOLLOUT; } ev.events |= EPOLLET; if (epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, handle, &ev) != 0) { if (errno == ENOENT) { if (epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, handle, &ev) != 0) { std::cerr << "epoll_ctl add error " << errno << std::endl; return -errno; } ++_max_fd; } else { ++_max_fd; } } return 0; } int EpollDemultiplexer::remove(Handle handle) { struct epoll_event ev; if (epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, handle, &ev) != 0) { std::cerr << "epoll_ctl del error" << errno << std::endl; return -errno; } --_max_fd; return 0; }
typedef int Handle; class EventHandler { public: EventHandler() {} virtual ~EventHandler() {} virtual Handle get_handle() const = 0; virtual void handle_read() = 0; virtual void handle_write() = 0; virtual void handle_error() = 0; };listen_handler.h
#include "event_handler.h" #include "event.h" class ListenHandler : public EventHandler { public: ListenHandler(Handle fd); virtual ~ListenHandler(); virtual Handle get_handle() const { return _listen_fd; } virtual void handle_read(); virtual void handle_write(); virtual void handle_error(); private: Handle _listen_fd; };listen_handler.cpp
#include "listen_handler.h" #includesocket_handler.h#include #include #include #include #include "event_handler.h" #include "reactor.h" #include "socket_handler.h" ListenHandler::ListenHandler(Handle fd) : _listen_fd(fd) { // do nothing } ListenHandler::~ListenHandler() { close(_listen_fd); } void ListenHandler::handle_read() { int fd = accept(_listen_fd, NULL, NULL); EventHandler* h = new (std::nothrow)SocketHandler(fd); assert(h != NULL); Reactor& r = Reactor::get_instance(); r.regist(h, ReadEvent); } void ListenHandler::handle_write() { // do nothing } void ListenHandler::handle_error() { // do nothing }
#include "event_handler.h" #include "event.h" class SocketHandler : public EventHandler { public: SocketHandler(Handle fd); virtual ~SocketHandler(); virtual Handle get_handle() const { return _socket_fd; } virtual void handle_read(); virtual void handle_write(); virtual void handle_error(); private: Handle _socket_fd; char* _buf; static const int MAX_SIZE = 1024; };socket_handler.cpp
#include "socket_handler.h" #include#include #include #include #include #include #include #include "reactor.h" SocketHandler::SocketHandler(Handle fd) : _socket_fd(fd) { _buf = new (std::nothrow)char[MAX_SIZE]; assert(_buf != NULL); memset(_buf, 0, MAX_SIZE); } SocketHandler::~SocketHandler() { close(_socket_fd); delete[] _buf; } void SocketHandler::handle_read() { if (read(_socket_fd, _buf, MAX_SIZE) > 0) { write(_socket_fd, _buf, strlen(_buf)); } handle_error(); } void SocketHandler::handle_write() { // do nothing } void SocketHandler::handle_error() { Reactor& r = Reactor::get_instance(); r.remove(this); }
demo
client.cpp#include "sys/socket.h" #includeserver.cpp#include #include #include #include "reactor.h" #include "event_handler.h" #include "listen_handler.h" #include "event.h" int main() { int socketfd = -1; if ( (socketfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { std::cerr << "socket error " << errno << std::endl; exit(-1); } struct sockaddr_in seraddr; seraddr.sin_family = AF_INET; seraddr.sin_port = htons(53031); seraddr.sin_addr.s_addr = inet_addr("127.0.0.1"); // TODO if (connect(socketfd, (struct sockaddr*)&seraddr, sizeof(seraddr)) < 0) { std::cerr << "connect error " << errno << std::endl; exit(-2); } char wbuf[64] = {0}; strcpy(wbuf, "hello world"); int n = write(socketfd, wbuf, strlen(wbuf)); char rbuf[64] = {0}; memset(rbuf, 0, sizeof(rbuf)); n = read(socketfd, rbuf, sizeof(rbuf)); std::cout << "send [" << wbuf << "] reply [" << rbuf << "]" << std::endl; if (n < 0) { std::cerr << "read error " << errno << std::endl; exit(-3); } close(socketfd); return 0; Reactor& actor = Reactor::get_instance(); EventHandler* handler = new ListenHandler(socketfd); actor.regist(handler, ReadEvent); while(true) { actor.dispatch(-1); std::cout << "client one loop" << std::endl; } return 0; }
#include#include #include #include #include "reactor.h" #include "event_handler.h" #include "listen_handler.h" #include "event.h" int main() { int listenfd = -1; if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { std::cerr << "socket error " << errno << std::endl; exit(-1); } struct sockaddr_in seraddr; seraddr.sin_family = AF_INET; seraddr.sin_port = htons(53031); seraddr.sin_addr.s_addr = htonl(INADDR_ANY); if (bind(listenfd, (struct sockaddr*)&seraddr, sizeof(seraddr)) < 0) { std::cerr << "bind error " << errno << std::endl; exit(-2); } if (listen(listenfd, 5) < 0) { std::cerr << "listen error " << errno << std::endl; exit(-3); } Reactor& actor = Reactor::get_instance(); EventHandler* handler = new ListenHandler(listenfd); actor.regist(handler, ReadEvent); while(true) { actor.dispatch(100); //std::cout << "one loop" << std::endl; } return 0; }