基于epoll的tcp-socket通信服务模块
程序员文章站
2022-07-14 09:19:05
...
最近一直研究对自己的开源物联网边缘服务瘦身,以适应资源更短缺的设备上运行,因此需要裁减第三方库的依赖,由于原先tcp-socket通信是调用第三方库的,因此准备动手为开源项目写一个简要适合项目需要的服务模块。
由于服务端需要针对每个客户端有定制下行通讯需求,因此采用epoll以便于明确知道句柄,进行特定通信。
epoll相关函数不多,容易理解,相关知识文章一大把,我就不阐述了。这里讲述一下我的项目要求:
通信服务模块作为一个独立运行线程,构建一个epoll,监听客户端连接,然后进入循环,获取事件描述信息,并处理相关事务;
新连接进来后,除了注册fd到epoll外,我们还需要定义有一个额外的容器自行管理fd;
在写入、读取异常时除了从epoll删除fd外,同时也需要从容器删除fd;
另外由于业务通信是有次序的,构建了两个缓存队列存储读取、写入的业务数据。
下面来看具体实现源码:
epoll_socket.h
#ifndef _EPOLL_SOCKET_H_
#define _EPOLL_SOCKET_H_
/**********************************************************************************
*Copyright 2020-05-06, pyfree
*
*File Name : epoll_socket.h
*File Mark :
*Summary :
*
*Current Version : 1.00
*Author : pyfree
*FinishDate :
*
*Replace Version :
*Author :
*FinishDate :
***********************************************************************************/
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string>
#include <set>
#include "thread_py.h"
#include "queuedata.h"
#include "Mutex.h"
struct ItemCache
{
ItemCache()
: data(""),fd(-1)
{
};
ItemCache(std::string data_)
: data(data_),fd(-1)
{
};
ItemCache(std::string data_,int fd_)
: data(data_),fd(fd_)
{
};
std::string data;
int fd;
};
class epoll_socket : public Thread_py
{
public:
epoll_socket();
~epoll_socket();
/**
* 打开服务侦听
* @param port {int} 侦听端口
* @param isblock_listen {bool} 是否阻塞
* @return {void}
*/
bool open(int port, bool isblock_listen = false);
int get_socketfd();
/**
* 线程运行函数
* @return {char*}
*/
int run();
/**
* 指定fd发送字符串
* @param item {string} 字符内容
* @param fd {int} -1时,对所有fd发送
* @return {void}
*/
void send(std::string item, int fd=-1);
/**
* 获取客户端内容
* @param item {string&} 字符内容
* @param fd {int&} 客户端fd
* @return {bool}
*/
bool get(std::string &item,int &fd);
/**
* 获取最新异常信息
* @return {char*}
*/
char* get_errmsg();
/**
* 设置读写数据时是否打印输出
* @param flag {bool} 是否打印
* @return {char*}
*/
void setPrintFlag(bool flag);
/**
* 设置客户端事务是否阻塞
* @param flag {bool} 是否阻塞
* @return {char*}
*/
void setClientBlock(bool flag);
private:
void add_event(int fd, int state);
void del_event(int fd, int state);
void mod_event(int fd, int state);
void add_client(int fd);
void del_client(int fd);
void handle_events(struct epoll_event *events, int num);
bool handle_accept();
bool do_read(int fd);
bool do_write(int fd);
private:
bool running;
bool print_flag;
bool isblock_client;
int socketfd;
struct sockaddr_in servaddr;
int epollfd;
char err_msg[256];
std::set<int> fds_client;
PYMutex fds_mutex;
QueueData<ItemCache> buffer_read;
QueueData<ItemCache> buffer_write;
};
#endif
epoll_socket.cpp
#include "epoll_socket.h"
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <iostream>
#define MAXBUFFSIZE 1024
#define MAXEVENTS 500
#define FDSIZE 1000
epoll_socket::epoll_socket()
: running(true)
, print_flag(false)
, isblock_client(false)
{
socketfd = 0;
memset(&servaddr, 0, sizeof(servaddr));
}
epoll_socket::~epoll_socket()
{
running = false;
}
bool epoll_socket::open(int port, bool isblock_listen)
{
if ((socketfd = socket(AF_INET, SOCK_STREAM, 0)) == -1){
printf("create socket error: %s(errno: %d)\n", strerror(errno), errno);
return false;
}
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);//IP地址设置成INADDR_ANY,让系统自动获取本机的IP地址。
servaddr.sin_port = htons(port);
if (!isblock_listen) {
int flags = fcntl(socketfd, F_GETFL, 0);
fcntl(socketfd, F_SETFL, flags | O_NONBLOCK);//设置为非阻塞
}
//设置重用地址,防止Address already in use
int on = 1;
if (setsockopt(socketfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1){
snprintf(err_msg, sizeof(err_msg)
, "set reuse addr error: %s(errno: %d)\n"
, strerror(errno), errno);
return false;
}
//将本地地址绑定到所创建的套接字上
if (bind(socketfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == -1){
snprintf(err_msg, sizeof(err_msg)
, "bind socket error: %s(errno: %d)\n"
, strerror(errno), errno);
return false;
}
//开始监听是否有客户端连接
if (listen(socketfd, 5) == -1) {
snprintf(err_msg, sizeof(err_msg)
, "listen socket error: %s(errno: %d)\n"
, strerror(errno), errno);
return false;
}
std::cout << "create socket success\n";
return true;
}
int epoll_socket::get_socketfd()
{
return socketfd;
}
int epoll_socket::run()
{
//创建一个描述符
if ((epollfd = epoll_create(FDSIZE)) == -1){
snprintf(err_msg, sizeof(err_msg)
, "listen socket error: %s(errno: %d)\n"
, strerror(errno), errno);
return -1;
}
//添加监听描述符事件
add_event(socketfd, EPOLLIN);
struct epoll_event events[MAXEVENTS];
int ret;
while (running) {
//获取已经准备好的描述符事件
ret = epoll_wait(epollfd, events, MAXEVENTS, 1);
handle_events(events, ret);
}
close(epollfd);
return 0;
}
void epoll_socket::send(std::string item,int fd/*=-1*/)
{
if(fd<0){
fds_mutex.Lock();
std::set<int> fds_ = fds_client;
fds_mutex.Unlock();
std::set<int>::iterator it = fds_.begin();
for (; it!=fds_.end(); ++it)
{
buffer_write.add(ItemCache(item,fd));
mod_event(*it,EPOLLOUT);
}
}else{
fds_mutex.Lock();
bool f_ = (fds_client.find(fd)!=fds_client.end());
fds_mutex.Unlock();
if(f_)
{
buffer_write.add(ItemCache(item,fd));
mod_event(fd,EPOLLOUT);
}
}
}
bool epoll_socket::get(std::string &item,int &fd)
{
bool ret =false;
ItemCache it;
if( buffer_read.pop(it))
{
item = it.data;
fd = it.fd;
ret = true;
}
return ret;
}
void epoll_socket::add_event(int fd, int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
/*
//如果是ET模式,设置EPOLLET
ev.events |= EPOLLET;
*/
//设置是否阻塞
if(!isblock_client){
int flags = fcntl(fd, F_GETFL);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
}
void epoll_socket::del_event(int fd, int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev);
}
void epoll_socket::mod_event(int fd, int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev);
}
void epoll_socket::add_client(int fd)
{
fds_mutex.Lock();
fds_client.insert(fd);
fds_mutex.Unlock();
}
void epoll_socket::del_client(int fd)
{
fds_mutex.Lock();
fds_client.erase(fd);
fds_mutex.Unlock();
}
void epoll_socket::handle_events(epoll_event * events, int num)
{
int i;
int fd;
//进行选好遍历
for (i = 0; i < num; i++) {
fd = events[i].data.fd;
//根据描述符的类型和事件类型进行处理
if ((fd == socketfd) && (events[i].events& EPOLLIN)){
handle_accept();
}else if (events[i].events & EPOLLIN){
do_read(fd);
}else if (events[i].events & EPOLLOUT){
do_write(fd);
}else{
del_client(fd);
close(fd);
}
}
}
bool epoll_socket::handle_accept()
{
int clifd;
struct sockaddr_in cliaddr;
socklen_t cliaddrlen = sizeof(cliaddr);
clifd = accept(socketfd, (struct sockaddr*)&cliaddr, &cliaddrlen);
if (clifd == -1) {
snprintf(err_msg, sizeof(err_msg)
, "listen socket error: %s(errno: %d)\n"
, strerror(errno), errno);
return false;
}
else {
char msg[128] = { 0 };
//获取端口错误
sprintf(msg,"accept a new client(%d):%s:%d\n"
, clifd, inet_ntoa(cliaddr.sin_addr)
, cliaddr.sin_port);
std::cout << msg;
//添加一个客户描述符和事件
add_event(clifd, EPOLLIN);
add_client(clifd);
}
}
bool epoll_socket::do_read(int fd)
{
char buf[MAXBUFFSIZE]={0};
int buflen = read(fd, buf, MAXBUFFSIZE);
if (buflen == -1) {
snprintf(err_msg, sizeof(err_msg)
, "read error(%d): %s(errno: %d)\n"
, fd, strerror(errno), errno);
std::cout << err_msg;
del_client(fd);
close(fd);
del_event(fd, EPOLLIN);
return false;
}
else if (buflen == 0) {
char msg[128] = { 0 };
sprintf(msg,"client(%d) close.\n", fd);
std::cout << msg;
del_client(fd);
close(fd);
del_event(fd, EPOLLIN);
return true;
}
else {
if(print_flag){
char msg[MAXBUFFSIZE] = { 0 };
sprintf(msg, "read message is:%s\n", buf);
std::cout << msg;
}
buffer_read.add(ItemCache(std::string(buf,buflen),fd));
}
return true;
}
bool epoll_socket::do_write(int fd)
{
ItemCache it;
if(!buffer_write.pop(it))
{
return false;
}
if(it.fd!=fd){
snprintf(err_msg, sizeof(err_msg)
, "write error,fd(%d,%d)\n",it.fd,fd);
std::cout << err_msg;
}
int nwrite;
nwrite = write(fd, it.data.c_str(), it.data.length());
if (nwrite == -1)
{
snprintf(err_msg, sizeof(err_msg)
, "write error: %s(errno: %d)\n"
, strerror(errno), errno);
std::cout << err_msg;
del_client(fd);
close(fd);
del_event(fd, EPOLLOUT);
return false;
}
else{
if(print_flag){
char msg[MAXBUFFSIZE] = { 0 };
sprintf(msg, "write message is:%s\n", it.data.c_str());
std::cout << msg;
}
mod_event(fd, EPOLLIN);
}
return true;
}
char * epoll_socket::get_errmsg()
{
return err_msg;
}
void epoll_socket::setPrintFlag(bool flag)
{
print_flag = flag;
}
void epoll_socket::setClientBlock(bool flag)
{
isblock_client = flag;
}
其他依赖的源码以及编译一并给出:
线程锁相关:
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#ifndef _PYMUTEX_H_
#define _PYMUTEX_H_
/***********************************************************************
*Copyright 2020-03-06, pyfree
*
*File Name : Mutex.h
*File Mark :
*Summary : 线程锁
*
*Current Version : 1.00
*Author : pyfree
*FinishDate :
*
*Replace Version :
*Author :
*FinishDate :
************************************************************************/
#ifdef WIN32
//#include <windows.h>
#else
#include <pthread.h>
#endif
typedef void *HANDLE;
class IMutex
{
public:
virtual ~IMutex() {}
/**
* 上锁
* @return {void}
*/
virtual void Lock() const = 0;
/**
* 尝试上锁
* @return {void}
*/
virtual bool TryLock() const = 0;
/**
* 解锁
* @return {void}
*/
virtual void Unlock() const = 0;
};
class PYMutex : public IMutex
{
public:
PYMutex();
~PYMutex();
virtual void Lock() const;
virtual bool TryLock() const;
virtual void Unlock() const;
private:
#ifdef _WIN32
HANDLE m_mutex;
#else
mutable pthread_mutex_t m_mutex;
#endif
};
#endif //_PYMUTEX_H_
#include "Mutex.h"
#ifdef WIN32
#include <windows.h>
#endif
//#include <iostream>
#include <stdio.h>
PYMutex::PYMutex()
{
#ifdef _WIN32
m_mutex = ::CreateMutex(NULL, FALSE, NULL);
#else
pthread_mutex_init(&m_mutex, NULL);
#endif
}
PYMutex::~PYMutex()
{
#ifdef _WIN32
::CloseHandle(m_mutex);
#else
pthread_mutex_destroy(&m_mutex);
#endif
}
void PYMutex::Lock() const
{
#ifdef _WIN32
//DWORD d = WaitForSingleObject(m_mutex, INFINITE);
WaitForSingleObject(m_mutex, INFINITE);
/// \todo check 'd' for result
#else
pthread_mutex_lock(&m_mutex);
#endif
}
bool PYMutex::TryLock() const
{
#ifdef _WIN32
DWORD dwWaitResult = WaitForSingleObject(m_mutex, 0);
if (dwWaitResult != WAIT_OBJECT_0 && dwWaitResult != WAIT_TIMEOUT) {
printf("thread WARNING: bad result from try-locking mutex\n");
}
return (dwWaitResult == WAIT_OBJECT_0) ? true : false;
#else
return (0==pthread_mutex_trylock(&m_mutex))?true:false;
#endif
};
void PYMutex::Unlock() const
{
#ifdef _WIN32
::ReleaseMutex(m_mutex);
#else
pthread_mutex_unlock(&m_mutex);
#endif
}
容器队列相关:
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#ifndef _QUEUE_DATA_H_
#define _QUEUE_DATA_H_
/***********************************************************************
*Copyright 2020-03-06, pyfree
*
*File Name : queuedata.h
*File Mark :
*Summary :
*数据队列类,线程安全
*
*Current Version : 1.00
*Author : pyfree
*FinishDate :
*
*Replace Version :
*Author :
*FinishDate :
************************************************************************/
#include <queue>
#include <deque>
#include <stdio.h>
#include <string.h>
#include "Mutex.h"
template <class T>
class QueueData
{
public:
QueueData(std::string desc = "thread_queue");
~QueueData();
//////////////////////////////////////////////////////////////
/**
* 获取队列大小
* @return {int } 队列大小
*/
int size();
/**
* 判定队列是否为空
* @return {bool } 是否为空队列
*/
bool isEmpty();
/**
* 获取队列头元素
* @param it {T&} 头元素
* @return {bool } 是否成功
*/
bool getFirst(T &it);
/**
* 删除元素
* @return {bool } 是否成功
*/
bool removeFirst();
/**
* 获取队列头元素,并从队列终删除
* @param it {T&} 头元素
* @return {bool } 是否成功
*/
bool pop(T &it);
/**
* 从队列头开始逐步获取多个元素,并剔除
* @param its {queue<T>&} 获取到的元素集
* @param sizel {int} 一次获取多少个
* @return {bool } 至少获取一个元素以上则成功
*/
bool getList(std::queue<T> &its,unsigned int sizel=5);
/**
* 从队列尾部添加元素
* @param it {T} 被添加元素
* @return {void } 无返回
*/
void add(T it);
/**
* 从队列头部添加元素
* @param it {T} 被添加元素
* @return {void } 无返回
*/
void add_front(T it);
/**
* 清空元素
* @return {void }
*/
void clear();
private:
void init();
QueueData& operator=(const QueueData&) {return this;};
protected:
std::string queue_desc;
private:
/////////////////////////////点集转发////////////////////////////////////////////
//协议解析结果缓存
std::deque<T> datacache_queue; //队列容器
PYMutex m_Mutex; //线程锁,或者如果更彻底采用acl库,采用acl::thread_mutex替代
//
static unsigned int QSize; //队列大小约束,超出是会从队列头剔除旧数据腾出空位在对末添加数据
//
int queue_overS; //队列溢出次数计数
};
template <class T>
unsigned int QueueData<T>::QSize = 100;
template <class T>
QueueData<T>::QueueData(std::string desc)
: queue_desc(desc)
{
init();
};
template <class T>
void QueueData<T>::init()
{
queue_overS = 0;
};
template <class T>
QueueData<T>::~QueueData()
{
}
//////////////////////////////////////////////////////////
template <class T>
int QueueData<T>::size()
{
int ret = 0;
m_Mutex.Lock();
ret = static_cast<int>(datacache_queue.size());
m_Mutex.Unlock();
return ret;
}
template <class T>
bool QueueData<T>::isEmpty()
{
bool ret = false;
m_Mutex.Lock();
ret = datacache_queue.empty();
m_Mutex.Unlock();
return ret;
}
template <class T>
bool QueueData<T>::getFirst(T &it)
{
bool ret = false;
m_Mutex.Lock();
if (!datacache_queue.empty())
{
it = datacache_queue.front();
ret = true;
}
m_Mutex.Unlock();
return ret;
}
template <class T>
bool QueueData<T>::removeFirst()
{
bool ret = false;
m_Mutex.Lock();
if (!datacache_queue.empty())
{
datacache_queue.pop_front();
ret = true;
}
m_Mutex.Unlock();
return ret;
}
template <class T>
bool QueueData<T>::pop(T &it)
{
bool ret = false;
m_Mutex.Lock();
if (!datacache_queue.empty())
{
it = datacache_queue.front();
datacache_queue.pop_front();
ret = true;
}
m_Mutex.Unlock();
return ret;
};
template <class T>
bool QueueData<T>::getList(std::queue<T> &its,unsigned int sizel)
{
m_Mutex.Lock();
while (!datacache_queue.empty())
{
its.push(datacache_queue.front());
datacache_queue.pop_front();
if (its.size() >= sizel)
{
break;
}
}
m_Mutex.Unlock();
return !its.empty();
};
template <class T>
void QueueData<T>::add(T it)
{
m_Mutex.Lock();
if (datacache_queue.size() > QSize)
{
queue_overS++;
datacache_queue.pop_front();
}
datacache_queue.push_back(it);
m_Mutex.Unlock();
if (queue_overS >= 10)
{
//每溢出10次,报告一次
printf("add item to queue %s at end,but the size of QueueData is up to limmit size: %d .[%s %s %d]\n"
, queue_desc.c_str(), QSize
, __FILE__, __FUNCTION__, __LINE__);
queue_overS = 0;
}
}
template <class T>
void QueueData<T>::add_front(T it)
{
m_Mutex.Lock();
if (datacache_queue.size() > QSize)
{
queue_overS++;
datacache_queue.pop_front();
}
datacache_queue.push_front(it);
m_Mutex.Unlock();
if (queue_overS >= 10)
{
//每溢出10次,报告一次
printf("add item to queue %s at first,but the size of QueueData is up to limmit size: %d .[%s %s %d]\n"
, queue_desc.c_str(), QSize
, __FILE__, __FUNCTION__, __LINE__);
queue_overS = 0;
}
}
template <class T>
void QueueData<T>::clear()
{
m_Mutex.Lock();
datacache_queue.clear();
m_Mutex.Unlock();
queue_overS = 0;
}
#endif //_QUEUE_DATA_H_
线程相关:
#ifndef _THREAD_PY_H_
#define _THREAD_PY_H_
/**********************************************************************************
*Copyright 2020-05-06, pyfree
*
*File Name : thread_py.h
*File Mark :
*Summary :
*
*Current Version : 1.00
*Author : pyfree
*FinishDate :
*
*Replace Version :
*Author :
*FinishDate :
***********************************************************************************/
#include <pthread.h>
#include <unistd.h>
class Thread_py
{
private:
//current thread ID
pthread_t tid;
//thread status
int threadStatus;
//get manner pointer of execution
static void* run0(void* pVoid);
//manner of execution inside
void* run1();
public:
//threadStatus-new create
static const int THREAD_STATUS_NEW = 0;
//threadStatus-running
static const int THREAD_STATUS_RUNNING = 1;
//threadStatus-end
static const int THREAD_STATUS_EXIT = -1;
// constructed function
Thread_py();
~Thread_py();
//the entity for thread running
virtual int run()=0;
//start thread
bool start();
//gte thread ID
pthread_t getThreadID();
//get thread status
int getState();
//wait for thread end
void join();
//wait for thread end in limit time
void join(unsigned long millisTime);
};
#endif /* _Thread_py_H */
#include "thread_py.h"
#include <stdio.h>
void* Thread_py::run0(void* pVoid)
{
Thread_py* p = (Thread_py*) pVoid;
p->run1();
return p;
}
void* Thread_py::run1()
{
threadStatus = THREAD_STATUS_RUNNING;
tid = pthread_self();
run();
threadStatus = THREAD_STATUS_EXIT;
tid = 0;
pthread_exit(NULL);
}
Thread_py::Thread_py()
{
tid = 0;
threadStatus = THREAD_STATUS_NEW;
}
Thread_py::~Thread_py()
{
join(10);
}
int Thread_py::run()
{
while(true){
printf("thread is running!\n");
sleep(100);
}
return 0;
}
bool Thread_py::start()
{
return pthread_create(&tid, NULL, run0, this) == 0;
}
pthread_t Thread_py::getThreadID()
{
return tid;
}
int Thread_py::getState()
{
return threadStatus;
}
void Thread_py::join()
{
if (tid > 0)
{
pthread_join(tid, NULL);
}
}
void Thread_py::join(unsigned long millisTime)
{
if (tid == 0)
{
return;
}
if (millisTime == 0)
{
join();
}else
{
unsigned long k = 0;
while (threadStatus != THREAD_STATUS_EXIT && k <= millisTime)
{
usleep(100);
k++;
}
}
}
demo,main.cpp以及Makefile
#include <iostream>
#include "epoll_socket.h"
using namespace std;
int main(int argc, char **argv)
{
epoll_socket myepoll;
if(!myepoll.open(5000,true)){
cout << myepoll.get_errmsg();
}
myepoll.start();
myepoll.setPrintFlag(true);
std::string item;
int fd = 0;
int cout = 0;
while (true)
{
if(myepoll.get(item,fd))
{
// printf("read:%s\n",item.c_str());
char buf[32]={0};
sprintf(buf,"rec_count=%d",cout++);
myepoll.send(std::string(buf),fd);
}
usleep(10);
}
return 0;
}
CX=g++
BIN := .
TARGET := epolltest
source := Mutex.cpp thread_py.cpp epoll_socket.cpp main.cpp
$(TARGET) :
$(CX) -gdwarf-2 $(source) -o $(BIN)/$(TARGET) -lpthread
clean:
rm $(BIN)/$(TARGET)
编译后我本人是采用sscom5工具模拟客户端,启动3个,设置间隔分别为10、20、30ms发送间隔进行压力测试,跑了一个下午,数据读写依然稳定,有兴趣的朋友可以继续测试优化,例如增加客户端端吗,降低cup资源消耗。
坚持分享工程化,产品化的知识为主,更便捷实现技术落地,谢谢
上一篇: 内网安装angular-cli
下一篇: ntopng安装和基本使用教程