c++实现一个简易的网络缓冲区的实践
1. 前言
请思考以下几个问题:
1).为什么需要设计网络缓冲区,内核中不是有读写缓冲区吗?
需要设计的网络缓冲区和内核中tcp缓冲区的关系如下图所示,通过socket进行进行绑定。具体来说网络缓冲区包括读(接收)缓冲区和写(发送)缓冲区。设计读缓冲区的目的是:当从tcp中读数据时,不能确定读到的是一个完整的数据包,如果是不完整的数据包,需要先放入缓冲区中进行缓存,直到数据包完整才进行业务处理。设计写缓冲区的目的是:向tcp写数据不能保证所有数据写成功,如果tcp写缓冲区已满,则会丢弃数据包,所以需要一个写缓冲区暂时存储需要写的数据。
2).缓冲区应该设置为堆内存还是栈内存?
假设有一个服务端程序,需要同时连接多个客户端,每一个socket就是一个连接对象,所以不同的socket都需要自己对应的读写缓冲区。如果将缓冲区设置为栈内存,很容易爆掉,故将将其设置为堆内存更加合理。此外,缓冲区容量上限一般是有限制的,一开始不需要分配过大,仅仅在缓冲区不足时进行扩展。
3).读写缓冲区的基本要求是什么?
通过以上分析,不难得出读写缓冲区虽然是两个独立的缓冲区,但是其核心功能相同,可以复用其代码。
读写缓冲区至少提供两类接口:存储数据和读取数据
读写缓冲区要求:先进先出,保证存储的数据是有序的
4).如何界定数据包?
第一种使用特殊字符界定数据包:例如\n
,\r\n
,第二种通过长度界定数据包,数据包中首先存储的是整个数据包的长度,再根据长度进行读取。
5).几种常见的缓冲区设计
①ringbuffer+读写指针
ringbuffer是一段连续的内存,当末端已经写入数据后,会从头部继续写数据,所以感觉上像一个环,实际是一个循环数组。ringbuffer的缺点也很明显:不能够扩展、对于首尾部分的数据需要增加一次io调用。
②可扩展的读写缓冲区+读写指针
下图设计了一种可扩展的读写缓冲区,在创建时会分配一块固定大小的内存,整个结构分为预留空间数据空间。预留空间用于存储必要的信息,真正存储数据的空间由连续内存组成。此种缓冲区设计相对于ringbuffer能够扩展,但是也有一定的缺点:由于需要最大化利用空间,会将数据移动至开头,移动操作会降低读写速度。
本文实现可扩展的读写缓冲区+读写指针
2. 数据结构
①buffer类的设计与初始化
buffer类的数据结构如下所示,m_s
是指向缓冲区的指针,m_max_size
是缓冲区的长度,初始设置为10,并根据扩展因子m_expand_par
进行倍增。扩展因子m_expand_par
设置为2,表示每次扩增长度翻倍,也就是说缓冲区的长度随扩展依次为10、20、40、80。
class buffer{ public: buffer(); //构造 ~buffer(); int init(); //分配缓冲区 private: char* m_s; //缓冲区指针 size_t m_read_index; //读指针位置 size_t m_write_index; //写指针位置 size_t m_max_size; //缓冲区长度 size_t m_expand_par; //扩展因子 };
构造函数的初始化列表中初始化成员变量。实际初始化缓冲区在init函数中分配内存,大小为m_max_size
。不在构造函数中初始化缓冲区的原因是:如果构造函数中分配失败,无法处理,也可使用raii手段进行处理
buffer::buffer() :m_read_index(0),m_write_index(0),m_max_size(10), m_expand_par(2),m_s(nullptr) {} buffer::~buffer() { delete[] m_s; } int buffer::init() { m_s = new char[m_max_size](); if (m_s == nullptr) { cout << "分配m_s失败\n"; return -1; } return 0; }
②读写指针的位置变化
当缓冲区为空时,读写指针位置相同都为0。
在写入长度为6的数据后,读写指针位置如图
接着读取两个字节后,读写指针如图
③扩展缓冲区实现
扩展缓冲区实际分为两步,将有效数据前移至缓冲区头(最大化利用数据),再进行扩展。根据成员变量扩展因子m_expand_par
的值,将缓冲区按倍数扩大。
假设当前存储数据4个字节,读写指针如下图。需要新增9个字节
将数据前移至缓冲区头
扩展缓冲区为2倍
写入9个字节
实际需要实现的两个私有成员函数:调整数据位置至缓冲区头adjust_buffer()
和扩展expand_buffer()
,设置为私有属性则是因为不希望用户调用,仅仅在写入缓冲区前判断不够就进行扩展,用户不应该知道与主动调用。
class buffer { public: ... private: void adjust_buffer(); //调整数据位置至缓冲区头部头 void expand_buffer(size_t need_size); //扩展缓冲区长度 ... }
adjust_buffer()
实现如下,注释写的较为清楚,不再赘述
void buffer::adjust_buffer() { if (m_read_index == 0) //数据已经在头部,直接返回 return; int used_size = m_write_index - m_read_index; if (used_size == 0) { //缓冲区为空,重置读写指针 m_write_index = 0; m_read_index = 0; } else { cout << "调整前read_index write_index" << m_read_index << " " << m_write_index << endl; memcpy(m_s, &m_s[m_read_index], used_size); //将数据拷贝至头部 m_write_index -= m_read_index; //写指针也前移 cout << "调整了" << used_size << "个字节" << endl; m_read_index = 0; //读指针置0 } cout << "调整后read_index write_index" << m_read_index << " " << m_write_index << endl; }
扩展缓冲区实现如下:
- 首先根据需要写入的字节数判断缓冲区长度多大才能够容下
- 申请新的存储区,并将数据拷贝到新存储区
- 释放旧缓冲区,将新存储区作为缓冲区
void buffer::expand_buffer(size_t need_size) //need_size需要写入的字节数 { size_t used_size = m_write_index - m_read_index; //used_size表示已经存储的字节数 size_t remain_size = m_max_size - used_size; //remain_size表示剩余空间 size_t expand_size = m_max_size; while (remain_size < need_size) { //剩余空间不够时扩展,用while表示直到扩展至够用 expand_size *= m_expand_par; remain_size = expand_size - used_size; //cout << "扩展长度中... 总剩余 总长度 " << remain_size << " " << expand_size << endl; } char* s1 = new char[expand_size](); //申请新的空间 memcpy(s1, m_s, m_max_size); free(m_s); m_s = s1; //将新空间挂载到缓冲区 m_max_size = expand_size; //更新缓冲区总长度 //cout << "扩展结束,总长度为" << m_max_size << endl; }
3. 外部接口设计与实现
以读缓冲区为例需要提供的接口有:向缓冲区写入数据write_to_buffer()
,向缓冲区读取数据read_from_buffer()
,得到能够读取的最大字节数readable_bytes()
。
class buffer { public: void write_to_buffer(char* src); //从src中写数据 size_t readable_bytes(); //存储数据的字节数 size_t read_from_buffer(char *dst,int bytes); //读数据 size_t pop_bytes(size_t bytes); //丢弃数据 }
① 写入缓冲区write_to_buffer()
write_to_buffer()
实现的思路如流程图所示:
判断剩余空间:
剩余空间不够:调整数据至头部、扩展缓冲区
剩余空间足够:向下继续
判断当前空间:
当前空间不够:调整数据至头部
剩余空间足够:向下继续
存储数据
根据流程图实现起来逻辑非常清晰,src表示原始数据
void buffer::write_to_buffer(char* src) { size_t used_size = m_write_index - m_read_index; //used_size表示已经存储的字节数 size_t remain_size = m_max_size - used_size; //remain_size表示剩余空间 size_t cur_size = m_max_size - m_write_index; //cur_size表示当前能够存储的空间 size_t size = init_random_write(&src); //printf("已经使用%d,剩余总长度%d,剩余当前长度%d\n", used_size, remain_size, cur_size); if (size > remain_size) { //剩余空间不够 adjust_buffer(); expand_buffer(size); } else if (size > cur_size) { //剩余空间够,当前存不下 adjust_buffer(); } memcpy(&m_s[m_write_index], src, size); //存储数据 m_write_index += size; delete[] src; //更新并打印log //used_size = m_write_index - m_read_index; //remain_size = m_max_size - used_size; //cur_size = m_max_size - m_write_index; //printf("已经使用%d,剩余总长度%d,剩余当前长度%d\n", used_size, remain_size, cur_size); }
流程图中还出现随机一段数据,这是用来调试的。随机初始化一段长度为0~ 40,字符a~ z的数据,并写缓存区
static int get_random_len() { return rand() % 40; } static int get_random_ala() { return rand() % 26; } size_t buffer::init_random_write(char** src) { int size = get_random_len(); char ala = get_random_ala(); *src = new char[size]; cout << "准备写入的长度为" << size << " 值全是 " << (unsigned char)('a' + ala) << endl; for (int i = 0; i < size; i++) { (*src)[i] = 'a' + ala; } return size; }
② 读取缓冲区read_from_buffer()
read_from_buffer(char*dst,int read_size)
传入需要拷贝到目的地址和需要读取的字节数,需要注意的是需要读取的字节数为-1
表示全部读取,函数返回实际读取的字节数。实现如流程图所示:
代码如下
size_t buffer::read_from_buffer(char*dst,int read_size) { size_t read_max = m_write_index - m_read_index; //read_max存储的字节数 if (read_size == 0 || read_max == 0) //读取0字节和空缓存区时直接返回 return 0; if (read_size == -1) { //全读走 memcpy(dst, &m_s[m_read_index], read_max); m_read_index += read_max; cout << "读取了" << read_max << "个字节" << endl; } else if (read_size > 0) { //读取指定字节 if ((size_t)read_size > read_max) read_size = read_max; memcpy(dst, &m_s[m_read_index], read_size); m_read_index += read_size; cout << "读取了" << read_size << "个字节" << endl; } return read_size; //返回读取的字节数 }
③ 丢弃数据pop_bytes
size_t pop_bytes(size_t size)
传入需要丢弃的字节数,需要注意的是需要丢弃的字节数为-1
表示全部丢弃;-2表示随机丢弃0~ 40字节,函数返回实际丢弃的字节数。实现如流程图所示:
size_t buffer::pop_bytes(size_t size) { size_t read_max = m_write_index - m_read_index; //存储数据长度 //test random if (size == -2) size = get_random_len(); if (size == 0 || read_max == 0) //缓冲区为空或丢弃0字节返回 return 0; if (size == -1) { //全丢 m_read_index += read_max; cout << "丢弃了" << read_max << "个字节" << endl; return read_max; } if (size > 0) { //丢弃指定字节 if (size > read_max) size = read_max; m_read_index += size; cout << "丢弃了" << size << "个字节" << endl; } return size; }
④ 其他接口
peek_read()
和peek_write()
返回读写指针的位置
size_t peek_read(); //指向准备读的位置(调试用) size_t peek_write(); //指向准备写的位置(调试用) size_t buffer::peek_write() { return m_write_index; } size_t buffer::peek_read() { return m_read_index; }
4. 完整代码与测试
① 完整代码
buffer.h
#pragma once class buffer { public: buffer(); //构造 ~buffer(); //析构 int init(); //分配缓冲区 void write_to_buffer(char* src); //写数据 size_t pop_bytes(size_t bytes); //丢弃数据 size_t read_from_buffer(char *dst,int bytes);//读数据 size_t readable_bytes(); //得到存储数据的字节数 size_t peek_read(); //指向准备读的位置(调试用) size_t peek_write(); //指向准备写的位置(调试用) private: void adjust_buffer(); //调整数据位置至缓冲区头 void expand_buffer(size_t need_size); //扩展缓冲区长度 size_t init_random_write(char** src); //随机初始化一段数据(调试用) private: char* m_s; //缓冲区指针 size_t m_read_index; //读指针位置 size_t m_write_index; //写指针位置 size_t m_max_size; //缓冲区长度 size_t m_expand_par; //扩展因子 };
buffer.cpp
:
#include "buffer.h" #include<iostream> #include<time.h> using namespace std; int total_write = 0; //记录总写入 int total_read = 0; //记录总读取 static int get_random_len() { return rand() % 40; } static int get_random_ala() { return rand() % 26; } buffer::buffer() :m_read_index(0),m_write_index(0),m_max_size(10), m_expand_par(2),m_s(nullptr) {} buffer::~buffer() { delete[] m_s; } int buffer::init() { m_s = new char[m_max_size](); if (m_s == nullptr) { cout << "分配m_s失败\n"; return -1; } return 0; } size_t buffer::read_from_buffer(char*dst,int read_size) { size_t read_max = m_write_index - m_read_index; //read_max存储的字节数 if (read_size == 0 || read_max == 0) //读取0字节和空缓存区时直接返回 return 0; if (read_size == -1) { //全读走 memcpy(dst, &m_s[m_read_index], read_max); m_read_index += read_max; printf("读取完成:\t读取%d个字节\n", read_max); total_read += read_max; } else if (read_size > 0) { //读取指定字节 if ((size_t)read_size > read_max) read_size = read_max; memcpy(dst, &m_s[m_read_index], read_size); m_read_index += read_size; printf("读取完成:\t读取%d个字节\n", read_size); total_read += read_size; } return read_size; //返回读取的字节数 } size_t buffer::readable_bytes() { return m_write_index - m_read_index; } size_t buffer::peek_write() { return m_write_index; } size_t buffer::peek_read() { return m_read_index; } void buffer::write_to_buffer(char* src) { size_t used_size = m_write_index - m_read_index; //used_size表示已经存储的字节数 size_t remain_size = m_max_size - used_size; //remain_size表示剩余空间 size_t cur_size = m_max_size - m_write_index; //cur_size表示当前能够存储的空间 size_t size = init_random_write(&src); //printf("已经使用%d,剩余总长度%d,剩余当前长度%d\n", used_size, remain_size, cur_size); if (size > remain_size) { //剩余空间不够 adjust_buffer(); expand_buffer(size); } else if (size > cur_size) { //剩余空间够,当前存不下 adjust_buffer(); } memcpy(&m_s[m_write_index], src, size); //存储数据 m_write_index += size; delete[] src; //更新并打印log used_size = m_write_index - m_read_index; remain_size = m_max_size - used_size; cur_size = m_max_size - m_write_index; printf("写入完成:\t总存储%d,剩余空间%d,剩余当前空间%d\n", used_size, remain_size, cur_size); } size_t buffer::pop_bytes(size_t size) { size_t read_max = m_write_index - m_read_index; //存储数据长度 //test random if (size == -2) size = get_random_len(); if (size == 0 || read_max == 0) //缓冲区为空或丢弃0字节返回 return 0; if (size == -1) { //全丢 m_read_index += read_max; cout << "丢弃了" << read_max << "个字节" << endl; total_read += read_max; return read_max; } if (size > 0) { //丢弃指定字节 if (size > read_max) size = read_max; m_read_index += size; cout << "丢弃了" << size << "个字节" << endl; total_read += size; } return size; } size_t buffer::init_random_write(char** src) { int size = get_random_len(); total_write += size; *src = new char[size]; char ala = get_random_ala(); cout << "随机写入:\t长度为" << size << " 值全是 " << (unsigned char)('a' + ala) << endl; for (int i = 0; i < size; i++) { (*src)[i] = 'a' + ala; } return size; } void buffer::adjust_buffer() { if (m_read_index == 0) //数据已经在头部,直接返回 return; int used_size = m_write_index - m_read_index; if (used_size == 0) { //缓冲区为空,重置读写指针 m_write_index = 0; m_read_index = 0; } else { cout << "调整前read_index write_index" << m_read_index << " " << m_write_index << endl; memcpy(m_s, &m_s[m_read_index], used_size); //将数据拷贝至头部 m_write_index -= m_read_index; //写指针也前移 cout << "调整了" << used_size << "个字节" << endl; m_read_index = 0; //读指针置0 } cout << "调整后read_index write_index" << m_read_index << " " << m_write_index << endl; } void buffer::expand_buffer(size_t need_size) //need_size需要写入的字节数 { size_t used_size = m_write_index - m_read_index; //used_size表示已经存储的字节数 size_t remain_size = m_max_size - used_size; //remain_size表示剩余空间 size_t expand_size = m_max_size; while (remain_size < need_size) { //剩余空间不够时扩展,用while表示直到扩展至够用 expand_size *= m_expand_par; remain_size = expand_size - used_size; cout << "扩展长度中... 总剩余 总长度 " << remain_size << " " << expand_size << endl; } char* s1 = new char[expand_size](); //申请新的空间 memcpy(s1, m_s, m_max_size); free(m_s); m_s = s1; //将新空间挂载到缓冲区 m_max_size = expand_size; //更新缓冲区总长度 cout << "扩展结束,总长度为" << m_max_size << endl; }
② 测试
int main() { srand((unsigned)time(null)); //调试需要初始化随机种子 buffer* pbuffer = new buffer(); //创建buffer对象 if (pbuffer->init() != 0) //init函数分配缓冲区 return 0; { char* s = nullptr; //s是指向随机数据的指针 char* read = new char[1000]; //读取时将数据存储到的指针read size_t read_size = 0; //本次读取到的字节数 pbuffer->write_to_buffer(s); read_size = pbuffer-> read_from_buffer(read, -1); pbuffer->write_to_buffer(s); pbuffer->pop_bytes(-2); read_size = read_size = pbuffer-> read_from_buffer(read, 0); pbuffer->write_to_buffer(s); cout << "总写入\t" << total_write << endl;; cout << "总读取\t" << total_read << endl; cout << "目前写入" << total_write - total_read << endl; cout << "可读取\t" << pbuffer->readable_bytes()<< endl; printf(" write %d read %d \n", pbuffer->peek_write(),pbuffer->peek_read()); if (total_write - total_read != pbuffer->readable_bytes()) { //根据总写入-总读取和一共存储的字节数判断是否存储正确 cout << "error!!!" << endl; } else cout << "test is ok\n\n\n"; } delete s; delete[] read; delete pbuffer; return 0; }
随机1000000次测试
int main() { srand((unsigned)time(null)); //调试需要初始化随机种子 buffer* pbuffer = new buffer(); //创建buffer对象 if (pbuffer->init() != 0) //init函数分配缓冲区 return 0; char* s = nullptr; //s是指向随机数据的指针 char* read = new char[1000]; //读取时将数据存储到的指针read size_t read_size = 0; //本次读取到的字节数 unsigned long long time = 0; //调试的循环次数 while (1) { pbuffer->write_to_buffer(s); read_size = pbuffer-> read_from_buffer(read, -1); pbuffer->write_to_buffer(s); pbuffer->write_to_buffer(s); pbuffer->pop_bytes(-2); read_size = read_size = pbuffer-> read_from_buffer(read, 0); pbuffer->write_to_buffer(s); pbuffer->pop_bytes(-2); pbuffer->write_to_buffer(s); read_size = pbuffer-> read_from_buffer(read, -1); pbuffer->write_to_buffer(s); pbuffer->write_to_buffer(s); read_size = pbuffer-> read_from_buffer(read, 22); pbuffer->write_to_buffer(s); pbuffer->write_to_buffer(s); read_size = pbuffer-> read_from_buffer(read, -1); pbuffer->pop_bytes(-2); pbuffer->pop_bytes(-2); pbuffer->write_to_buffer(s); pbuffer->write_to_buffer(s); read_size = pbuffer-> read_from_buffer(read, 2); pbuffer->write_to_buffer(s); read_size = pbuffer-> read_from_buffer(read, 17); pbuffer->write_to_buffer(s); pbuffer->pop_bytes(-2); pbuffer->write_to_buffer(s); pbuffer->write_to_buffer(s); pbuffer->read_from_buffer(read, 18); cout << "总写入\t" << total_write << endl;; cout << "总读取\t" << total_read << endl; cout << "目前写入" << total_write - total_read << endl; cout << "可读取\t" << pbuffer->readable_bytes()<< endl; printf(" write %d read %d \n", pbuffer->peek_write(),pbuffer->peek_read()); if (total_write - total_read != pbuffer->readable_bytes()) { //根据总写入-总读取和一共存储的字节数判断是否存储正确 cout << "error!!!" << endl; break; } if (time == 1000000) //循环1000000次 { cout << "1000000 ok!!!" << endl; break; } cout << time++ << " is ok\n\n\n"; } delete s; delete[] read; delete pbuffer; return 0; }
到此这篇关于c++实现一个简易的网络缓冲区的实践的文章就介绍到这了,更多相关c++ 网络缓冲区内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
上一篇: SMTP email from C#
下一篇: synchronized凭什么锁得住?