欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

handy网络库源码阅读

程序员文章站 2022-03-25 22:52:08
简洁易用的C++11网络库,From:https://github.com/yedf/handy 在整理过去的资料过程中,发现过去有关注过这一个网络库,简单看了一下属于轻量级的实现,因此本文将对该库进行简单的学习之旅,目标是对网络基础知识进一步巩固。 编译和运行 库目前实现了linux和mac环境, ......

简洁易用的c++11网络库,from:
在整理过去的资料过程中,发现过去有关注过这一个网络库,简单看了一下属于轻量级的实现,因此本文将对该库进行简单的学习之旅,目标是对网络基础知识进一步巩固。

编译和运行

库目前实现了linux和mac环境,需要支持c++11因此gcc的版本要大于4.8,在我的虚拟机ubuntu12.04是要升级gcc版本,然后使用云centos 7,之前安装的cmake版本是2.8.12,与要求的版本大于3.2不匹配,因此先升级cmake

  $ cd /tmp
  $ wget https://cmake.org/files/v3.3/cmake-3.3.2.tar.gz
  $ tar xzvf cmake-3.3.2.tar.gz
  $ cd cmake-3.3.2
  $ ./bootstrap
  $ gmake
  $ make install
#from : https://blog.csdn.net/fword/article/details/79347356

升级后能顺利编译。

网络库基础知识

既然是高性能网络库,那linux必然是epoll,在raw-examples带有对epoll的测试epoll.cc(水平触发)和epoll-et.cc(边缘触发)
水平触发:当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll_wait()时,它还会通知你在上没读写完的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你!如果系统中有大量你不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率!
edge_triggered(边缘触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait()时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你!这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符!

根据linux的man-page中说明边缘触发要求在epoll_ctrl_add的时候就对文件描述符进行epollin|epollout|epollet事件关注(建议只对客户端套接字),这能避免不断地使用epoll_ctl_mod修改对epollin和epollout事件地关注。通常情况下监听套接字为水平触发,客户套接字边缘触发,对监听套接字和客户套接字都要设置非阻塞模式。监听套接字使用水平触发的原因是,多个连接同时到达如果使用边缘触发则epoll只会通知一次,有一些tcp连接在就绪队列积累得不到及时处理,如果使用水平触发需要采取而外的处理方式(使用while循环accpet,直到accept返回-1且errno设置为eagin表示所有的连接处理完了)
epoll的系统函数定义如下:

#include <sys/epoll.h>
   typedef union epoll_data {
                   void    *ptr;
                   int      fd;
                   uint32_t u32;
                   uint64_t u64;
               } epoll_data_t;
   struct epoll_event {
       uint32_t     events;    // epoll events
       epoll_data_t data;      // user data variable
   };
/*
功能:创建epoll对象
[1]size无意义,要求大于0
返回值:成功为非负文件描述符,失败为-1
*/
int epoll_create(int size);

/*
功能:对epoll对象增加,修改或删除感兴趣事件,输入<文件描述符fd, 操作op, 事件epoll_event>
操作op:增epoll_ctl_add,改epoll_ctl_mod,删epoll_ctl_del
事件epoll_event.events:对应文件描述符可读epollin,可写epollout,对方关闭epollrdhup,异常epollpri
,错误epollerr,挂起epollhup,设置边缘触发epollet,设置只触发一次epolloneshot,epollwakeup,epollexclusive
返回值:0-成功,-1失败
*/
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

/*
功能:等待内核中的epoll_event事件可读或者timeout到达
[1]epfd是一个epoll实例句柄根据epoll_create得到
[2]epoll_event包含文件描述符和epoll事件,对应内存由用户开辟
[3]最多事件数,必须大于0
[4]超时事件,单位为ms
返回值:>0有对应个文件描述符发生了事件;0超时到达;-1发生错误
*/
int epoll_wait(int epfd, struct epoll_event *events,
                      int maxevents, int timeout);

下面是代码节选

//epoll.cc 水平触发

//main函数
//0)忽略sigpipe信号,避免对等方关闭后触发了写操作引起的sigpipe信号,而导致进程退出
::signal(sigpipe, sig_ign);
//1)定义了回馈的报文,长度1048576是为了测试写缓冲区满了的情况
httpres = "http/1.1 200 ok\r\nconnection: keep-alive\r\ncontent-type: text/html; charset=utf-8\r\ncontent-length: 1048576\r\n\r\n123456";
    for (int i = 0; i < 1048570; i++) {
        httpres += '\0';
    }
//2)创建epoll实例
int epollfd = epoll_create(1);
//3)创建socket监听套接字listenfd,设置非阻塞模式,bind,listen和加入到epollfd关注
int listenfd = socket(af_inet, sock_stream, 0);
int r = ::bind(listenfd, (struct sockaddr *) &addr, sizeof(struct sockaddr));
r = listen(listenfd, 20);
setnonblock(listenfd);
updateevents(epollfd, listenfd, epollin, epoll_ctl_add); //epoll_ctrl(epollfd,epoll_ctl_add,listenfd,ev.epollin)关注监听套接字的可读事件
//4)循环epoll_wait等待内核事件
for (;;) {  //实际应用应当注册信号处理函数,退出时清理资源
        loop_once(epollfd, listenfd, 10000); //调用epoll_wait,超时等待为10s,如果有事件返回也会立即返回
    }

//loop_once函数
int n = epoll_wait(efd, activeevs, kmaxevents, waitms);
for (int i = 0; i < n; i++) {
        int fd = activeevs[i].data.fd;
        int events = activeevs[i].events;
        if (events & (epollin | epollerr)) {
            if (fd == lfd) {
                handleaccept(efd, fd); //有连接到来,accpet得到对应文件描述符,调用updateevents加入efd的epollin关注列表
            } else {
                handleread(efd, fd); //客户端有数据,保存连接上下文到map<fd, con>cons中,根据http的协议(结尾"\n\n"或"\r\n\r\n")发送httpres给客户端,注意这里httpres太长,写write返回小于0且errno为eagain或ewouldblock,则要表示缓冲区已近满了不能再写了,要修改关注对应套接字的可读可写事件;后续回调可写继续写入,最后写完成后修改为只关注可读事件。
            }
        } else if (events & epollout) {
            if (output_log)
                printf("handling epollout\n");
            handlewrite(efd, fd);
        } else {
            exit_if(1, "unknown event");
        }
//updateevents函数
void updateevents(int efd, int fd, int events, int op) {
    struct epoll_event ev;
    memset(&ev, 0, sizeof(ev));
    ev.events = events;
    ev.data.fd = fd;
    printf("%s fd %d events read %d write %d\n", op == epoll_ctl_mod ? "mod" : "add", fd, ev.events & epollin, ev.events & epollout);
    int r = epoll_ctl(efd, op, fd, &ev); //把文件描述符fd加入到epoll对象efd关注
    exit_if(r, "epoll_ctl failed");
}

值得注意的是水平触发和边缘触发的区别,是在epoll_ctl中ev.events指定,默认为水平触发;后续要特别注意对可写事件的处理上,水平触发需要在写遇到wouldblock后关注可写事件,写完后取消关注可写事件,而边缘触发只是在epoll_ctl的add操作中指定epollet和同时关注可读可写事件,而后在写write数据中遇到ewouldblock直接跳出写循环等到内核说可以再写则继续写。关于读read每次都读到返回-1且error为eagain|ewouldblock,这种策略下就不用在读方面区分是水平模式还是边缘模式。

注意:作者给出的示例中,没有设置监听套接字so_reuseaddr,如果服务端断开而任一客户端没断开,服务端重新启动将出想bind失败,错误原因是"address already in use"会有约2s时间处于time_wait状态,建议服务端开始开启这个选项,当然也要考虑多次启动和抢占地址的情况出现。

功能模块

handy文件夹即网络库的核心,最后生成动态库和静态库,测试程序在example和10m两个文件夹,分析网络库将重点关注handy文件夹。handy文件夹主要的功能实现在如下文件中(从cmakelist文件可以看出)

  • ${project_source_dir}/handy/daemon.cc
  • ${project_source_dir}/handy/net.cc //定义网络设置辅助函数,比如setnonblock,setnodelay;设计读写缓冲区buffer
  • ${project_source_dir}/handy/codec.cc //定义编解码,目前支持以\r\n结尾和长度开始的消息
  • ${project_source_dir}/handy/http.cc //支持http
  • ${project_source_dir}/handy/conn.cc //tcp连接上下文
  • ${project_source_dir}/handy/poller.cc //对epoll的封装
  • ${project_source_dir}/handy/udp.cc //对udp的封装
  • ${project_source_dir}/handy/threads.cc //线程池和队列的封装
  • ${project_source_dir}/handy/file.cc //文件io的函数集
  • ${project_source_dir}/handy/util.cc //时间等基础函数
  • ${project_source_dir}/handy/conf.cc //ini配置文件读写的封装
  • ${project_source_dir}/handy/stat-svr.cc //
  • ${project_source_dir}/handy/port_posix.cc //网络字节序等系统相关网络辅助函数
  • ${project_source_dir}/handy/event_base.cc //事件循环和通道定义
  • ${project_source_dir}/handy/logging.cc) //日志

给上面功能分一下类:

  • 1)基础部件:daemon.cc, threads.cc, file.cc, util.cc, conf.cc, logging.cc
  • 2)系统相关:net.cc,poller.cc,port_posix.cc
  • 3)协议相关:codec.cc,udp.cc,http.cc
  • 4)网络封装:event_base.cc,conn.cc,stat-svr.cc
    对于基础部件可以单独测试,只看一下一些有趣的设计;对于系统相关的需要了解其作用;对于协议相关的要了解接口;对网络封装是本文的重点。

基础部件

公共函数util

  1. format返回格式化后的string,涉及到内存重分配
//util.h
struct util {
    static std::string format(const char *fmt, ...);
}
//util.cpp
string util::format(const char *fmt, ...) {
    char buffer[500]; //栈内存
    unique_ptr<char[]> release1;
    char *base;
    for (int iter = 0; iter < 2; iter++) {
        int bufsize;
        if(iter == 0) { //第一次尝试用char[500]去获取格式化数据
            bufsize = sizeof(buffer);
            base = buffer;
        } else { //第二次尝试用char[30000]去获取格式化数据
            bufsize = 30000;
            base = new char[bufsize]; //或许需要检查一下30k内存是否分配成功
            release1.reset(base); //新内存将由unique_ptr接管,即在程序真正退出前,unique_ptr对象销毁时同时销毁绑定的内存;
        }
        char *p = base;
        char *limit = base + bufsize;
        if (p < limit) {
            va_list ap;
            va_start(ap, fmt);
            p += vsnprintf(p, limit - p, fmt, ap);
            va_end(ap);
        }
        // truncate to available space if necessary
        if(p >= limit) {
            if(iter == 0) {
                continue;
            } else {
                p = limit - 1;
                *p = '\0';
            }
        }
        break;
    }
    return base;//注意这里是把char* 返回给一个临时结果string;如果是返回char *则会出现unique_ptr销毁一次而外部使用时崩溃,permission denid
}

以上主要的疑问:
1)p += vsnprintf(p, limit - p, fmt, ap);理论上p +=max(bufsize)会导致p>=limit出现吗?
--边界情况会出现p==limit而不会大于。

2)引入unique_ptr的作用是什么?是为了委托base的内存回收吗?即本程序会内存泄漏吗?
--unique_ptr的存在时为了函数结束后对成员进行回收,如果不用unique_ptr,那么会增加如下代码释放内存:

    string strtemp(base); //多了一次拷贝
    if(base != null && base != buffer) delete base; base = null; //多了一次释放,主要判断不为栈数组,否则非法释放
    return strtemp;

测试代码如下:

 56         string s1 = "hello";
 57         for(int i = 0; i < 99; i++) {
 58                 s1 += "hello";
 59         }
 60         printf("len(s1)=%d\n", s1.length()); //500
 61         string s2 = std::string(util::format("%s", s1.c_str() ) );
 62         printf("len(s2)=%d\n", s2.length()); //500
  1. 退出时自动调用某一个函数
//util.h
struct noncopyable {
   protected:
    noncopyable() = default;
    virtual ~noncopyable() = default;

    noncopyable(const noncopyable &) = delete;
    noncopyable &operator=(const noncopyable &) = delete;
};
struct exitcaller : private noncopyable {
    ~exitcaller() { functor_(); }
    exitcaller(std::function<void()> &&functor) : functor_(std::move(functor)) {}

   private:
    std::function<void()> functor_;
};
//usage.cc
    //...
    int fd = open(filename.c_str(), o_rdonly);
    if (fd < 0) {
        return status::ioerror("open", filename);
    }
    exitcaller ec1([=] { close(fd); });

上述的exitcaller类似lockguard,或者说go语言的defer,表示当变量离开作用域时调用某一个函数,defer实现如下和上面只差一个lambda匿名函数:

#pragma once
#include <functional>

#define connection(text1,text2) text1##text2
#define connect(text1,text2) connection(text1,text2)

class deferhelper {
 public:
    deferhelper(std::function<void ()> &&cb) : cb_(std::move(cb)) {}
    ~deferhelper() { if (cb_) cb_(); }
 private:
    std::function<void ()> cb_;
};
#define defer(code)  deferhelper connect(l,__line__) ([&](){code;})

线程类threads

封装了一个队列和线程池。
队列的优点时put会检查是否满,pop_wait会等待超时或丢列不为空;

template <typename t>
struct safequeue : private std::mutex, private noncopyable {
    static const int wait_infinite = std::numeric_limits<int>::max(); //最大等待时间ms
    // 0 不限制队列中的任务数
    safequeue(size_t capacity = 0) : capacity_(capacity), exit_(false) {}
    //队列满则返回false,否则push_back到items_中,并使用ready_.notify_one()通知一个去取
    bool push(t &&v);
    //超时则返回t(),出现在队列为空情况;不超时返回items_中头元素
    t pop_wait(int waitms = wait_infinite);
    //超时返回false;不超时,v中存储items_中头元素
    bool pop_wait(t *v, int waitms = wait_infinite);
    //有锁获取元素个数,即items_.size
    size_t size();
    //退出,置exit_标识为true
    void exit();
    //取退出标识
    bool exited() { return exit_; }

   private:
    std::list<t> items_;
    std::condition_variable ready_;
    size_t capacity_;
    std::atomic<bool> exit_;
    void wait_ready(std::unique_lock<std::mutex> &lk, int waitms); //等待waitms,调用ready.wait_until函数
};

多线程队列则时能利用多个线程消化safequeue中的任务

typedef std::function<void()> task;
extern template class safequeue<task>;

struct threadpool : private noncopyable {
    //创建线程池,threads指定线程个数建议为cpunum或2*cpunum,
    threadpool(int threads, int taskcapacity = 0, bool start = true);
    //销毁safequeue和一些打印信息
    ~threadpool();
    //使用线程从safequeue中取元素让后执行
    void start();
    //停止safequeue
    threadpool &exit() {
        tasks_.exit();
        return *this;
    }
    //等待线程集合退出,for(auto &t : threads_)t.join();
    void join();

    //队列满返回false, 使用std::move把右值引用变成引用:tasks_.push(move(task));
    bool addtask(task &&task);
    bool addtask(task &task) { return addtask(task(task)); }
    size_t tasksize() { return tasks_.size(); }

   private:
    safequeue<task> tasks_;
    std::vector<std::thread> threads_;
};

文件io相关file

  1. status结构体
    记录最后文件操作的执行状态
struct status {
    status() : state_(null) {}
    status(int code, const char *msg);//state = new char[strlen(msg) + 8];state[0-4]=(strlen(msg) + 8),state[4-8]=code
    //...
private:
    //    state_[0..3] == length of message
    //    state_[4..7]    == code
    //    state_[8..]  == message
    const char *state_;
  1. 文件io导出函数
//file.h
    //把文件filename的内容读到cont中
    static status getcontent(const std::string &filename, std::string &cont);
    //把cont写到文件filename中
    static status writecontent(const std::string &filename, const std::string &cont);
    //写入cont到临时文件tmpname,删除旧文件name,重命名tmpname文件为name文件
    static status renamesave(const std::string &name, const std::string &tmpname, const std::string &cont);
    //把文件夹dir中的文件名加入到result中,使用dirent.d中的readdir函数
    static status getchildren(const std::string &dir, std::vector<std::string> *result);
    //删除文件,使用unlink删除,c语言中的remove则内部使用了remove,不过remove也能删除目录
    static status deletefile(const std::string &fname);
    //创建目录,使用mkdir(name.c_str(), 0755),八进制0755表示文件权限为文件所有着7(r4+w2+e1),组5(r4+e1),其他用户5(r4+e1)
    static status createdir(const std::string &name);
    //删除文件夹deletedir
    static status deletedir(const std::string &name);
    //使用stat返回文件的信息
    static status getfilesize(const std::string &fname, uint64_t *size);
    //使用rename函数重命名一个文件
    static status renamefile(const std::string &src, const std::string &target);
    //使用access判断文件是否存在;或许何以通过stat返回失败-1且errno==enoent判断文件不存在
    static bool fileexists(const std::string &fname);

配置ini文件conf

为了程序的灵活性,一般都会有ini配置文件,ini配置文件的格式如下
[section]
key1 = value1
key2 = 2
作者导出接口如下:

//conf.h
struct conf {
    int parse(const std::string &filename); //解析文件filename的内容到values_
    std::string get(std::string section, std::string name, std::string default_value); //取字符串值section[name],没取到返回default_value
    long getinteger(std::string section, std::string name, long default_value);///取整数值section[name],没取到返回default_value
    double getreal(std::string section, std::string name, double default_value);//取浮点数值section[name],没取到返回default_value
    bool getboolean(std::string section, std::string name, bool default_value);//取布尔值section[name],没取到返回default_value
    std::list<std::string> getstrings(std::string section, std::string name);//取setction[name]对应的值
    std::map<std::string, std::list<std::string>> values_;//存储为section.key,value,为什么值是用list来存呢?因为有多行的value的情况。
    std::string filename; //对应解析的文件名

据实现描述这个conf参考了python的configparser,我喜欢轻量级mars的conf解析

日志logging

日志是服务器中比较重要的,因为发生异常基本都需要分析日志改善程序,日志库大部分都有glog的影子。对于服务端的日志,因为在多线程中,因此不能写串,有人提倡用prinf而不是ostream,ostream真的不是多线程安全,这一点待探索;日志是能分等级的,常见为debug,info,warning,fatal;日志可以是缓冲写或实时写,但要保证程序退出的时候尽量少的丢日志,尤其是异常退出的时候;日志是要支持滚动的,根据具体的要求按天滚动或者按大小滚动;每条日志头部有时间信息,尾部可能有文件和代码行信息。

通过查看logging.h的实现可以发现,日志分等级,日志是一个静态单例通过static logger &getlogger()返回,然后定义了一些宏对日志进行操作。文件要先设置文件名,然后真正写入是调用logv函数,写入前根据滚动规则获取要写入文件描述符,拼接当前时间等信息和传入的要写入的内容,实时写入到文件中。

守护deamon

实现是目的个人理解是为了让服务在后台运行,测试exmple/daemon.cc的程序,用户输入后终端会退出,但是服务会不退出。实现流程是fork一个子进程,然后父进程执行退出,调用setsid让子进程脱离当前终端的控制不随当前终端结束而结束。

系统相关部件

字节序转换和远程连接信息port_posix

实现了htobe的uint16_t,uint32_t,uint64_t,int16_t,int32_t,int64_t转换
实现了获取dns信息的gethostbyname("www.google.com")

net

  • fcntl设置文件描述符属性:setnonblock;setsockopt设置套接字属性:setreuseaddr,setreuseport,setnodelay
  • ip地址转换<string ip, port>到struct sockaddr_in addr_
  • 字符串slice切片,包含开始和结束字符指针,及一些相关操作;
  • 缓冲区buffer,设计一个可扩容动态伸缩的内存数组,本处实现的尾位置不可跨越(不可e_ < b_ ),即0<=b_ <= e_ <cap_。重要细节如下:
struct buffer {
    buffer() : buf_(null), b_(0), e_(0), cap_(0), exp_(512) {}
    ~buffer() { delete[] buf_; } //析构的时候销毁
    //统计属性
    size_t size() const { return e_ - b_; } //有效数据长度
    bool empty() const { return e_ == b_; } //没有有效数据
    char *data() const { return buf_ + b_; } //有效数据起地址
    char *begin() const { return buf_ + b_; }
    char *end() const { return buf_ + e_; }

    //内存分配,返回end()结果,分三种情况
    //1) end_ + len <= cap,足够内存容纳,不需要修改内存
    //2) size() + len < cap_ / 2,可容纳len,但一般以上的内存都在尾部,需要执行movehead即把有效数据移动到buf_上让b_=0
    //3) 其他情况,expand(len),扩张的大小为max(exp_, 2*cap_, size() + len)
    char *makeroom(size_t len);
    //分配长度为len的容量,返回数据结束位置
    char *allocroom(size_t len) {
        char *p = makeroom(len);
        addsize(len); //e_ += len;
        return p;
    }

    //增加一段数据
    buffer &append(const char *p, size_t len) {
        memcpy(allocroom(len), p, len); //1.调用allocroom分配足够容量,把新数据进去
        return *this;
    }
    //消费长度为len的数据,注意len一定小于size()
    buffer &consume(size_t len) {
        b_ += len;
        if (size() == 0)
            clear();
        return *this;
    }
    buffer &absorb(buffer &buf); //交换this和buf
private:
    char *buf_;//内存的首地址
    size_t b_, e_, cap_, exp_;//开始位置,结束位置,总容量,exp_期望大小
    void copyfrom(const buffer &b); //深拷贝b,先拷贝参数,然后this.buf_=new char[b.cap_];memcpy(this.buf_+b_,bu.buf_+b_,b.size())

多路复用epoll的封装poller

poll/epoll能管理的不仅仅是套接字,而是所有的文件描述符,在linux中管道,timefd_create,eventfd都是可以纳入epoll来管理,因此要对epoll做简单的封装,核心的内容是addchannel,removechannel,updatechannel对channel中的文件描述符fd和事件event的管理。

//poller.h
struct pollerbase : private noncopyable {
    int64_t id_;
    int lastactive_;
    pollerbase() : lastactive_(-1) {
        static std::atomic<int64_t> id(0);
        id_ = ++id;
    }
    virtual void addchannel(channel *ch) = 0;
    virtual void removechannel(channel *ch) = 0;
    virtual void updatechannel(channel *ch) = 0;
    virtual void loop_once(int waitms) = 0;
    virtual ~pollerbase(){};
};

pollerbase *createpoller(); //返回一个继承自pollerbase的pollerepoll

struct pollerepoll : public pollerbase {
    int fd_; //epoll对象,在构造函数中通过epoll_create得到
    std::set<channel *> livechannels_; //channel集合,可认为是要关注<fd,event>集合,不拥有他们的生命周器
    // for epoll selected active events
    struct epoll_event activeevs_[kmaxevents]; //epoll_wait返回的活跃文件描述符
    pollerepoll(); //epoll_create1(epoll_cloexec);
    ~pollerepoll(); //while (livechannels_.size()) {(*livechannels_.begin())->close();};  ::close(fd_);
    void addchannel(channel *ch) override; //加入关注int r = epoll_ctl(fd_, epoll_ctl_add, ch->fd(), &ev);livechannels_.insert(ch);
    void removechannel(channel *ch) override;//取消关注livechannels_.erase(ch);
    void updatechannel(channel *ch) override;//更新关注int r = epoll_ctl(fd_, epoll_ctl_mod, ch->fd(), &ev);activeevs_[i].data.ptr = null;(这一个是为什么呢?)
    void loop_once(int waitms) override;//等待epoll对象返回,回调对应的事件给通道lastactive_ = epoll_wait(fd_, activeevs_, kmaxevents, waitms);channel *ch = (channel *) activeevs_[i].data.ptr;ch->handlewrite();
};

协议相关

流数据长度和内容codec

tcp是基于字节流(stream)的可靠协议,客户端一条最小的有意义的数据称为一帧,基于流意味着数据帧可能两帧数据同时到达,或者数据帧不全的情况。服务端应用要根据和客户端约定的协议分离出一帧帧数据,响应相应的请求。

//codec.h
struct codecbase {
    // > 0 解析出完整消息,消息放在msg中,返回已扫描的字节数
    // == 0 解析部分消息
    // < 0 解析错误
    virtual int trydecode(slice data, slice &msg) = 0;
    virtual void encode(slice msg, buffer &buf) = 0;
    virtual codecbase *clone() = 0;
};
//以\r\n结尾的消息
struct linecodec : public codecbase {
    int trydecode(slice data, slice &msg) override; //找到以\r\n或\n结尾的,返回长度和msg
    void encode(slice msg, buffer &buf) override; //给msg加上\r\n写入到buf中
    codecbase *clone() override { return new linecodec(); }
}
//给出长度的消息,[4][len_4][msg_len]
struct lengthcodec : public codecbase {
    int trydecode(slice data, slice &msg) override;//首部8字节,第4-8字节为长度,如果有完成的数据返回长度和msg
    void encode(slice msg, buffer &buf) override;//给buf增加数据‘mbdt’+len(msg)+msg
    codecbase *clone() override { return new lengthcodec(); }
}

非可靠传输协议udp

udp是一种简单的面向数据报的运输层协议,不提供可靠性,只是把应用程序传给ip层的数据报发送出去,但是不能保证它们能到达目的地。在一些直播中会使用udp,有一些游戏开发者也探索了udp实现可靠性的可能。
udp创建的流程:

    int fd = socket(af_inet, sock_dgram, 0); //注意第二个参数为sock_dgram数据报流
    int r = net::setreuseaddr(fd);
    fatalif(r, "set socket reuse option failed");
    r = net::setreuseport(fd, reuseport);
    fatalif(r, "set socket reuse port option failed");
    r = util::addfdflag(fd, fd_cloexec);
    fatalif(r, "addfdflag fd_cloexec failed");
    r = ::bind(fd, (struct sockaddr *) &addr_.getaddr(), sizeof(struct sockaddr));

读写udp的命令如下:

    //recvfrom
    truct sockaddr_in raddr;
    socklen_t rsz = sizeof(raddr);
    ssize_t rn = recvfrom(fd, buf, bufsize, 0, (sockaddr *) &raddr, &rsz);
    if (rn < 0) {
        error("udp %d recv failed: %d %s", fd, errno, strerror(errno));
        return;
    }

    //sendto
    truct sockaddr_in raddr;
    socklen_t rsz = sizeof(raddr);
    int wn = ::sendto(fd, buf, bufsize, 0, (sockaddr *) raddr, rsz);

web常用http协议

http协议应该是每一个网络人直接接触最多的内容,因为bs和部分cs结构网络传输都是用http,因为其简单且描述的内容很全面。
http的交互分为客户端和服务端,客户端也可以是浏览器,客户端发起的请求叫做http请求(http request),其包括:request line + header + body,header与body之间有一个\r\n;http的请求方法有get, post, head, put, delete等。http请求的回复(http response)包括:status line + header + body (header分为普通报头,响应报头与实体报头)
一个典型的请求:

get http://nooverfit.com/wp/ http/1.1
accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8
accept-language: zh-hans-cn,zh-hans;q=0.5
upgrade-insecure-requests: 1
user-agent: mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36 (khtml, like gecko) chrome/70.0.3538.102 safari/537.36 edge/18.18362
accept-encoding: gzip, deflate
host: nooverfit.com
connection: keep-alive
cookie: hm_lvt_416c770ac83a9d9wewewe=15678wwewe,1568260075; hm_lvt_bfc6c239dfdfad0bbfed25f88a973fb0=1559dfd232

//http response
http/1.1 200 ok
server:
date: thu, 19 sep 2019 16:10:38 gmt
content-type: text/html; charset=utf-8
transfer-encoding: chunked
connection: keep-alive
vary: cookie,accept-encoding,user-agent
upgrade: h2,h2c
accept-ranges: bytes
referrer-policy: 

<html><head><title>this is title</title></head><body><h1>hello</h1>now is 20130611 02:14:31.518462</body></html>

对http实现来说首先是要解析请求和回复,httpmsg就是对http协议消息的解析,结果是分离出一个完整的请求帧

struct httpmsg {
    enum result {
        error,
        complete,
        notcomplete,
        continue100,
    };
    httpmsg() { httpmsg::clear(); };
    //内容添加到buf,返回写入的字节数
    virtual int encode(buffer &buf) = 0;
    //尝试从buf中解析,默认复制body内容
    virtual result trydecode(slice buf, bool copybody = true) = 0;
    //清空消息相关的字段
    virtual void clear();

    std::map<std::string, std::string> headers;
    std::string version, body;
    // body可能较大,为了避免数据复制,加入body2
    slice body2;

    std::string getheader(const std::string &n) { return getvaluefrommap_(headers, n); }
    slice getbody() { return body2.size() ? body2 : (slice) body; }

    //如果trydecode返回complete,则返回已解析的字节数
    int getbyte() { return scanned_; }
    //...
}

得到完整请求帧后就是分析对应的请求方法和请求资源

struct httprequest : public httpmsg {
    std::map<std::string, std::string> args;
    std::string method, uri, query_uri; //请求的方法和uri
    virtual int encode(buffer &buf);
    virtual result trydecode(slice buf, bool copybody = true);
    //...
}

处理完请求之后就是回馈给对应的客户端

struct httpresponse : public httpmsg {
    std::string statusword; //example "ok"
    int status; // example 200
    //...
}

网络封装

到了最后才是最难的网络封装部分,先上一个muduo网络库的图,这个是典型的reactor模式的设计,主要借鉴于java的nio网络模型的设计
handy网络库源码阅读

首先有一个事件循环,会实例化一个poller,然后也会导出定时器接口,然后应用层会是tcp或者http服务的套接字会半丁到channel,通过eventloop的updateloop加入poller对象关注,当有连接到来则回调channel中相关回调,最后传递到客户和服务方。handy的设计像是muduo的简化版本,没那么繁杂。even_base中实现和event_imp事件循环(不断调用poller::loop_once)和计时定时器,channel通道(文件描述符拥有着,控制关注事件,可读可写事件回调),

//event_base.cpp
//事件循环类
struct eventsimp {
pollerbase *poller_;
safequeue<task> tasks_;

void loop_once(int waitms) {
        poller_->loop_once(std::min(waitms, nexttimeout_));
        handletimeouts();
    }
void eventsimp::loop() {
    while (!exit_)
            loop_once(10000);
    //...
//添加超时任务
void safecall(const task &task) { safecall(task(task)); }
void safecall(task &&task) {
        tasks_.push(move(task));
        wakeup();
    }
//...
};

//通道,封装了可以进行epoll的一个fd
struct channel {
protected:
    eventbase *base_; //一个channel一定属于一个eventbase
    pollerbase *poller_; //base_->poller_
    int fd_; //初始化绑定的文件描述符
    short events_; //当前的关注事件
    int64_t id_; //递增标记
    std::function<void()> readcb_, writecb_, errorcb_; //读写错误回调

    // base为事件管理器,fd为通道内部的fd,events为通道关心的事件,构造最后会调用poller_->addchannel(this);加入poller中
    channel(eventbase *base, int fd, int events);

    //设置回调
    void onread(const task &readcb) { readcb_ = readcb; }
    void onwrite(const task &writecb) { writecb_ = writecb; }
    void onread(task &&readcb) { readcb_ = std::move(readcb); }
    void onwrite(task &&writecb) { writecb_ = std::move(writecb); }

    //启用读写监听
    void enableread(bool enable); //设置events_;更新通道poller_->updatechannel(this);
    void enablewrite(bool enable);
    void enablereadwrite(bool readable, bool writable);
    bool readenabled(); //返回是否关注了可读return events_ & kreadevent;
    bool writeenabled();//返回是否关注了可写return events_ & kwriteevent;
    
    //处理读写事件
    void handleread() { readcb_(); } //在poller的loop_once循环中,会根据struct epoll_event.data.ptr转换为channel,如果可读则调用对应的handleread
    void handlewrite() { writecb_(); }//在poller的loop_once循环中,会根据struct epoll_event.data.ptr转换为channel,如果可写则调用对应的handlewrite
}

在tcp数据能收到(回调)后,重要的是如何保存客户端的数据,处理完请求后发送给对应的客户端,因为有多个客户端的存在,因此要使用tcpconn来记录哪些tcp到来了,处理结果要回馈给哪个数据。

//conn.h
// tcp连接,使用引用计数
struct tcpconn : public std::enable_shared_from_this<tcpconn> {
    // tcp连接的个状态
    enum state {
        invalid = 1,
        handshaking,
        connected,
        closed,
        failed,
    };
    //服务端
    static tcpconnptr createconnection(eventbase *base, int fd, ip4addr local, ip4addr peer) {
            tcpconnptr con(new c);
            con->attach(base, fd, local, peer);
            return con;
        }
    void attach(eventbase *base, int fd, ip4addr local, ip4addr peer) {
        fatalif((destport_ <= 0 && state_ != state::invalid) || (destport_ >= 0 && state_ != state::handshaking),
            "you should use a new tcpconn to attach. state: %d", state_);
        base_ = base;
        state_ = state::handshaking;
        local_ = local;
        peer_ = peer;
        delete channel_;
        channel_ = new channel(base, fd, kwriteevent | kreadevent);
        trace("tcp constructed %s - %s fd: %d", local_.tostring().c_str(), peer_.tostring().c_str(), fd);
        tcpconnptr con = shared_from_this();
        con->channel_->onread([=] { con->handleread(con); });
        con->channel_->onwrite([=] { con->handlewrite(con); });
    }

    //发送数据
    void sendoutput() { send(output_); }//return ::write(channel_->fd, buf, bytes);if (wd == -1 && (errno == eagain || errno == ewouldblock)) //写对应fd,如果写失败关注可写事件(水平触发模式)
    
    //收到数据
    void handleread(const tcpconnptr &con) {
        while (state_ == state::connected) {
            input_.makeroom();
            int rd = readimp(channel_->fd(), input_.end(), input_.space());
            if(rd > 0) input_.addsize(rd);
        }
        //...
    }

    //客户端
    template <class c = tcpconn>
    static tcpconnptr createconnection(eventbase *base, const std::string &host, unsigned short port, int timeout = 0, const std::string &localip = "") {
        tcpconnptr con(new c);
        con->connect(base, host, port, timeout, localip); //执行connect
        return con;
    }
public:
    eventbase *base_; //属于哪一个事件循环
    channel *channel_; //属于哪一个通道
    buffer input_, output_; //输入和输出缓冲区
    ip4addr local_, peer_; //本地的套接字
    state state_; //连接状态
    tcpcallback readcb_, writablecb_, statecb_;//读写,连入/练出状态回调
    std::string desthost_, localip_;
    std::unique_ptr<codecbase> codec_; //对应codec
};

//服务器
struct tcpserver {
    tcpserver(eventbases *bases); //属于哪一个事件循环
    int bind(const std::string &host, unsigned short port, bool reuseport = false); //socket,bind,listen,创建listen_channel设置读回调为handleaccept()
    static tcpserverptr startserver(eventbases *bases, const std::string &host, unsigned short port, bool reuseport = false); //创建一个tcpserver,并调用bind函数
    void onconnstate(const tcpcallback &cb);//有新的连接连入
    // 消息处理与read回调冲突,只能调用一个
    void onconnmsg(codecbase *codec, const msgcallback &cb) {
        codec_.reset(codec);
        msgcb_ = cb;
        assert(!readcb_);
    }
private:
    eventbase *base_;//属于哪一个事件循环
    ip4addr addr_; //服务端地址
    channel *listen_channel_; //服务端的channel
    tcpcallback statecb_, readcb_; //读写回调
    msgcallback msgcb_; //消息回调
    std::unique_ptr<codecbase> codec_;
    void handleaccept();//有新的连接到来,accept得到客户套接字cfd,创建一个tcpconnptr绑定cfd,设置conn的读写和消息回调
    //...
};

结尾

以上就是handy的基本分析,总结来说算轻量级的muduo,可能还不应该用在生产环境,毕竟花一天多就能看得七七八八。最后就是示例代码了。

//example/echo.cc
#include <handy/handy.h>
using namespace handy;

int main(int argc, const char *argv[]) {
    eventbase base;
    signal::signal(sigint, [&] { base.exit(); });
    tcpserverptr svr = tcpserver::startserver(&base, "", 2099);
    exitif(svr == null, "start tcp server failed");
    svr->onconnread([](const tcpconnptr &con) { con->send(con->getinput()); });
    base.loop();
}
//example/http-hello.cc
#include <handy/handy.h>

using namespace std;
using namespace handy;

int main(int argc, const char *argv[]) {
    int threads = 1;
    if (argc > 1) {
        threads = atoi(argv[1]);
    }
    setloglevel("trace");
    multibase base(threads);
    httpserver sample(&base);
    int r = sample.bind("", 8081);
    exitif(r, "bind failed %d %s", errno, strerror(errno));
    sample.onget("/hello", [](const httpconnptr &con) {
        string v = con.getrequest().version;
        httpresponse resp;
        resp.body = slice("hello world");
        con.sendresponse(resp);
        if (v == "http/1.0") {
            con->close();
        }
    });
    signal::signal(sigint, [&] { base.exit(); });
    base.loop();
    return 0;
}