欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

reactor模式C++实现

程序员文章站 2022-07-05 23:37:17
copy from github上的一份实现。。。找不到链接了。。。 * epoll主要负责fd到event类型的映射 * EventDemultiplexer管理fd <...

copy from github上的一份实现。。。找不到链接了。。。
* epoll主要负责fd到event类型的映射
* EventDemultiplexer管理fd <-> event类型 <-> eventhandler具体怎么做event的回调方法,从而间接实现fd <–event类型–> eventhandler
的具体回调方法方法
* Reactor负责注册、管理、分配

核心代码

reactor.h
#include "event_handler.h"
#include "event.h"
#include "reactor_impl.h"

class ReactorImpl; // 为了隐藏具体实现么。。

class Reactor {
public:
    static Reactor& get_instance();
    int regist(EventHandler* handler, Event evt);
    void remove(EventHandler* handler);
    void dispatch(int timeout = 0);

private:
    Reactor();
    ~Reactor();
    Reactor(const Reactor&);
    Reactor& operator=(const Reactor&);

private:
    ReactorImpl* _reactor_impl;
    static Reactor reactor;
};
reactor.cpp
#include "reactor.h"
#include 
#include 

Reactor Reactor::reactor;

Reactor& Reactor::get_instance() {
    return reactor;
}

Reactor::Reactor() {
    _reactor_impl = new (std::nothrow)ReactorImpl();
    assert(_reactor_impl != NULL);
}

Reactor::~Reactor() {
    if (_reactor_impl != NULL) {
        delete _reactor_impl;
        _reactor_impl = NULL;
    }
}


int Reactor::regist(EventHandler* handler, Event evt) {
    return _reactor_impl->regist(handler, evt);
}

void Reactor::remove(EventHandler* handler) {
    return _reactor_impl->remove(handler);
}

void Reactor::dispatch(int timeout) {
    return _reactor_impl->dispatch(timeout);
}
reactor_impl.h
#include 
#include "event.h" #include "event_handler.h" #include "event_demultiplexer.h" class ReactorImpl { public: ReactorImpl(); ~ReactorImpl(); int regist(EventHandler* handler, Event evt); void remove(EventHandler* handler); void dispatch(int timeout = 0); private: EventDemultiplexer* _demultiplexer; std::map _handlers; };,>reactor_impl.cpp
#include "reactor_impl.h"
#include 
#include 
#include "epoll_demultiplexer.h"

ReactorImpl::ReactorImpl() {
    _demultiplexer = new (std::nothrow)EpollDemultiplexer();
    assert(_demultiplexer != NULL);
}

ReactorImpl::~ReactorImpl() {
    std::map::iterator iter = _handlers.begin();
    for(; iter != _handlers.end(); ++iter) {
        delete iter->second;
    }

    if (_demultiplexer != NULL) {
        delete _demultiplexer;
    }
}

int ReactorImpl::regist(EventHandler* handler, Event evt) {
    int handle = handler->get_handle();
    if (_handlers.find(handle) == _handlers.end()) {
        _handlers.insert(std::make_pair(handle, handler));
    }
    return _demultiplexer->regist(handle, evt);
}

void ReactorImpl::remove(EventHandler* handler) {
    int handle = handler->get_handle();
    // not check?
    _demultiplexer->remove(handle);

    std::map::iterator iter = _handlers.find(handle);
    delete iter->second;
    _handlers.erase(iter);
}

void ReactorImpl::dispatch(int timeout) {
    _demultiplexer->wait_event(_handlers, timeout);
},>,>
event.h
typedef unsigned int Event;
enum EventMask {
    ReadEvent  = 0x01,
    WriteEvent = 0x02,
    ErrorEvent = 0x04,
    EventMask  = 0xff
};
event_demultiplexer.h
#include 
#include "event_handler.h" #include "event.h" class EventDemultiplexer { public: EventDemultiplexer() {} virtual ~EventDemultiplexer() {} virtual int wait_event(std::map& handlers, int timeout = 0) = 0; virtual int regist(Handle handle, Event evt) = 0; virtual int remove(Handle handle) = 0; };,>epoll_demultiplexer.h
#include 
#include "event_handler.h" #include "event.h" #include "event_demultiplexer.h" class EpollDemultiplexer : public EventDemultiplexer { public: EpollDemultiplexer(); virtual ~EpollDemultiplexer(); virtual int wait_event(std::map& handlers, int timeout = 0); virtual int regist(Handle handle, Event evt); virtual int remove(Handle handle); private: int _max_fd; int _epoll_fd; };,>epoll_demultiplexer.cpp
#include 
#include 
#include 
#include 
#include 

#include "epoll_demultiplexer.h"

EpollDemultiplexer::EpollDemultiplexer() : _max_fd(0) {
    _epoll_fd = epoll_create(1024);
}

EpollDemultiplexer::~EpollDemultiplexer() {
    close(_epoll_fd);
}

int EpollDemultiplexer::wait_event(std::map& handlers, int timeout) {
    std::vector events(_max_fd);
    int num = epoll_wait(_epoll_fd, &events[0], _max_fd, timeout);
    if (num < 0) {
        //std::cerr << "WARNING: epoll_wait error " << errno << std::endl;
        return num;
    }

    for (int i = 0; i < num; ++i) {
        Handle handle = events[i].data.fd;
        if ((EPOLLHUP|EPOLLERR) & events[i].events) {
            assert(handlers[handle] != NULL);
            (handlers[handle])->handle_error();
        } else {
            if ((EPOLLIN) & events[i].events) {
                assert(handlers[handle] != NULL);
                (handlers[handle])->handle_read();
            }
            if (EPOLLOUT & events[i].events) {
                (handlers[handle])->handle_write();
            }
        }
    }
    return num;
}

int EpollDemultiplexer::regist(Handle handle, Event evt) {
    struct epoll_event ev;
    ev.data.fd = handle;
    if (evt & ReadEvent) {
        ev.events |= EPOLLIN;
    }
    if (evt & WriteEvent) {
        ev.events |= EPOLLOUT;
    }
    ev.events |= EPOLLET;

    if (epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, handle, &ev) != 0) { 
        if (errno == ENOENT) {
            if (epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, handle, &ev) != 0) {
                std::cerr << "epoll_ctl add error " << errno << std::endl;
                return -errno;
            }
            ++_max_fd;
        } else {
            ++_max_fd;
        }
    }
    return 0;
}

int EpollDemultiplexer::remove(Handle handle) {
    struct epoll_event ev;
    if (epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, handle, &ev) != 0) {
        std::cerr << "epoll_ctl del error" << errno << std::endl;
        return -errno;
    }
    --_max_fd;
    return 0;
},>
event_handler.h
typedef int Handle;

class EventHandler {
public:
    EventHandler() {}
    virtual ~EventHandler() {}
    virtual Handle get_handle() const = 0;
    virtual void handle_read() = 0;
    virtual void handle_write() = 0;
    virtual void handle_error() = 0;
};
listen_handler.h
#include "event_handler.h"
#include "event.h"

class ListenHandler : public EventHandler {
public:
    ListenHandler(Handle fd);
    virtual ~ListenHandler();
    virtual Handle get_handle() const {
        return _listen_fd;
    }
    virtual void handle_read();
    virtual void handle_write();
    virtual void handle_error();
private:
    Handle _listen_fd;
};
listen_handler.cpp
#include "listen_handler.h"
#include 
#include 
#include 
#include 
#include 
#include "event_handler.h"
#include "reactor.h"
#include "socket_handler.h"

ListenHandler::ListenHandler(Handle fd) : _listen_fd(fd) {
    // do nothing
}

ListenHandler::~ListenHandler() {
    close(_listen_fd);
}

void ListenHandler::handle_read() {
    int fd = accept(_listen_fd, NULL, NULL);
    EventHandler* h = new (std::nothrow)SocketHandler(fd);
    assert(h != NULL);
    Reactor& r = Reactor::get_instance();
    r.regist(h, ReadEvent);
}

void ListenHandler::handle_write() {
    // do nothing
}

void ListenHandler::handle_error() {
    // do nothing
}
socket_handler.h
#include "event_handler.h"
#include "event.h"

class SocketHandler : public EventHandler {
public:
    SocketHandler(Handle fd);
    virtual ~SocketHandler();
    virtual Handle get_handle() const {
        return _socket_fd;
    }
    virtual void handle_read();
    virtual void handle_write();
    virtual void handle_error();
private:
    Handle _socket_fd;
    char* _buf;
    static const int MAX_SIZE = 1024;
};
socket_handler.cpp
#include "socket_handler.h"
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include "reactor.h"

SocketHandler::SocketHandler(Handle fd) :
        _socket_fd(fd) {
    _buf = new (std::nothrow)char[MAX_SIZE];
    assert(_buf != NULL);
    memset(_buf, 0, MAX_SIZE);
}

SocketHandler::~SocketHandler() {
    close(_socket_fd);
    delete[] _buf;
}

void SocketHandler::handle_read() {
    if (read(_socket_fd, _buf, MAX_SIZE) > 0) {
        write(_socket_fd, _buf, strlen(_buf));
    } 
    handle_error();
}

void SocketHandler::handle_write() {
    // do nothing
}

void SocketHandler::handle_error() {
    Reactor& r = Reactor::get_instance();
    r.remove(this);
}

demo

client.cpp
#include "sys/socket.h"
#include 
#include 
#include 
#include 
#include "reactor.h"
#include "event_handler.h"
#include "listen_handler.h"
#include "event.h"

int main() {
    int socketfd = -1;
    if ( (socketfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        std::cerr << "socket error " << errno << std::endl;
        exit(-1);
    }

    struct sockaddr_in seraddr;
    seraddr.sin_family = AF_INET;
    seraddr.sin_port = htons(53031);
    seraddr.sin_addr.s_addr = inet_addr("127.0.0.1"); // TODO

    if (connect(socketfd, (struct sockaddr*)&seraddr, sizeof(seraddr)) < 0) {
        std::cerr << "connect error " << errno << std::endl;
        exit(-2);
    }

    char wbuf[64] = {0};
    strcpy(wbuf, "hello world");
    int n = write(socketfd, wbuf, strlen(wbuf));

    char rbuf[64] = {0};
    memset(rbuf, 0, sizeof(rbuf));
    n = read(socketfd, rbuf, sizeof(rbuf));
    std::cout << "send [" << wbuf << "] reply [" << rbuf << "]" << std::endl;

    if (n < 0) {
        std::cerr << "read error " << errno << std::endl;
        exit(-3);
    }
    close(socketfd);
    return 0;

    Reactor& actor = Reactor::get_instance();
    EventHandler* handler = new ListenHandler(socketfd);
    actor.regist(handler, ReadEvent);

    while(true) {
        actor.dispatch(-1);
        std::cout << "client one loop" << std::endl;
    }

    return 0;
}
server.cpp
#include 
#include 
#include 
#include 
#include "reactor.h"
#include "event_handler.h"
#include "listen_handler.h"
#include "event.h"

int main() {
    int listenfd = -1;
    if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        std::cerr << "socket error " << errno << std::endl;
        exit(-1);
    }

    struct sockaddr_in seraddr;
    seraddr.sin_family = AF_INET;
    seraddr.sin_port = htons(53031);
    seraddr.sin_addr.s_addr = htonl(INADDR_ANY);

    if (bind(listenfd, (struct sockaddr*)&seraddr, sizeof(seraddr)) < 0) {
        std::cerr << "bind error " << errno << std::endl;
        exit(-2);
    }

    if (listen(listenfd, 5) < 0) {
        std::cerr << "listen error " << errno << std::endl;
        exit(-3);
    }

    Reactor& actor = Reactor::get_instance();
    EventHandler* handler = new ListenHandler(listenfd);
    actor.regist(handler, ReadEvent);

    while(true) {
        actor.dispatch(100);
        //std::cout << "one loop" << std::endl;
    }

    return 0;
}