欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

国标GB28181流媒体服务解决方案4G安防摄像头直播视频流媒体平台EasyGBS的libuv库通信协议封装-支持udp和tcpserver同时使用

程序员文章站 2022-07-04 21:46:52
...

背景分析

未来摄像机将不再是自出厂之后就一成不变的,而是根据用户的需要,通过加载不同软件实现不同的业务功能,融入软件定义产品的新时代。

国标GB28181流媒体服务解决方案4G安防摄像头直播视频流媒体平台EasyGBS的libuv库通信协议封装-支持udp和tcpserver同时使用

 

通过规模化、多样化的智能前端摄像机进行精准的数据采集,后台强大的云计算和视频解析系统对采集的数据进行准确的解析和表述,庞大的大数据分析与挖掘系统对海量数据进行高效精准的处理,才能够真正的让视频监控协助用户准确的观察、识别和应对周边的事物,做到真正的拥抱智能时代。

国标GB28181流媒体服务解决方案4G安防摄像头直播视频流媒体平台EasyGBS的libuv库通信协议封装-支持udp和tcpserver同时使用

 

我们都知道,libuv的通信库比较强大,而且一直都有人维护,所有就使用这个通信库做服务端接收和发送,在库封装好之后,发现在接收和发送消息的时候总是卡住了,才发现是使用libuv库的发送的时候出现的问题,果断将发送直接用底层发送,使用socket发送,下面附注一下libuv通信库的源码,希望对大家有帮助。

1、首先下载libuv库,然后编译一份libuv.a的静态库

2、对应用层进行封装,下面粘贴一下源码:

#ifndef `ls_uv_socket_trans_h` 
#define `ls_uv_socket_trans_h` 
 
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <poll.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <errno.h>
#include <sys/epoll.h>
#include <sys/ioctl.h>
#include <arpa/inet.h>
#include <sys/mman.h>
#include <map>
 
#include "MyList.h"
#include "MyThread.h"
#include "TypeDef.h"
#include "tool.h"
#include "SysIf.h"
#include "uv.h"
 
 
#define UV_TCPSVR_LISTEN_MAXCONN 1024
#define UV_SOCKET_BUFFER_SIZE	1024*1024*10
 
 
//typedef void (*UvRecvPacketCallBack)(int sock,const char *pData, int len,const char *pSrcIP, int SrcPort,const char* pDestIP,int DestPort, bool bTcpRecv);
 
using namespace std;
 
typedef struct {
    uv_write_t req;
    uv_buf_t buf;
} write_req_t;
 
typedef struct _TUvSocketItem
{
	int 					socketfd;
	SOCKETTYPE_E			socktype;
	char					srcip[IPSTR_MAX_LEN + 1];
	int						srcport;
	char					destip[IPSTR_MAX_LEN + 1];
	int						destport;
	
	//uv_tcp_t 				tcphandle;
	//uv_udp_t				udphandle;
	//uv_buf_t 				recvbuf;   
	void 					*sockethandle;
	
	char 					*pTcpRecvData;
	int 					nTcpRecvLen;
 
	_TUvSocketItem()
	{
		socketfd = -1;
		socktype = SOCKETTYPE_UDP;
		srcip[0] = '\0';
		srcport = 0;
		destip[0] = '\0';
		destport = 0;
 
		sockethandle = NULL;
		pTcpRecvData = NULL;
		nTcpRecvLen = 0;
	}
}TUvSocketItem;
 
 
class CUvSocketTransMgr
{
public:
	CUvSocketTransMgr();
	~CUvSocketTransMgr();
 
public:
	bool BindSocket(const char _srcip[], int _srcport, SOCKETTYPE_E type, char* _destip = NULL, int _destport = 0);
	int MoveSocket(const char _srcip[],int _srcport,SOCKETTYPE_E type,const char* _destip,int _destport);
	
	virtual int RecvPacketCallBack(int sock,const char *pData, int len,const char *pSrcIP, int SrcPort,const char* pDestIP,int DestPort, bool bTcpRecv) = 0;
	int SendPacket(int sock,const char *pData, int Datalen,const char *pSrcIP, int SrcPort,const char* pDestIP,int DestPort, bool bTcpSend);
private:
	bool tcp4_echo_start(const char *ip, int port);
	bool udp4_echo_start(const char *ip, int port);
	bool run(int status = UV_RUN_DEFAULT);
 
	static void echo_alloc(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
	static void on_closeserver(uv_handle_t* handle);
	static void on_close(uv_handle_t* handle);
	static void after_write(uv_write_t* req, int status);
	static void after_read(uv_stream_t* client, ssize_t nread, const uv_buf_t* rcvbuf);
	static void on_connection(uv_stream_t* tcpserver, int status);
	static void on_send(uv_udp_send_t* req, int status);
	static void on_recv(uv_udp_t* handle, ssize_t nread, const uv_buf_t* rcvbuf, const struct sockaddr* addr, unsigned flags); 
private:
	void sip_tcp_sticky_packet(TUvSocketItem * pSocketItem, const char *pData, int len, const char *pSrcIP, int SrcPort,const char* pDestIP,int DestPort);
	
	TUvSocketItem *OpenAUvSocketItem(int socketfd, SOCKETTYPE_E type, char *srcip, int srcport, char *destip, int destport, void *handle);
	TUvSocketItem *FindAUvSocketItem(int socketfd);
	void RemoveAUvSocketItem(int socketfd);
	void RemoveAllUvSocketItem(void);
 
	std::map<int, TUvSocketItem*> m_UvSocketItemMap;                     //scoketfd ->  TUvSocketItem
	typedef std::map<int, TUvSocketItem*>::iterator UVSOCKETMAP_IT;
	typedef std::map<int, TUvSocketItem*>::value_type UVSOCKETMAP_VT;
public:
	uv_loop_t* loop;
	uv_tcp_t tcpServer;
	uv_udp_t udpServer;
	uv_handle_t* server;
 
	char m_serverip[IPSTR_MAX_LEN + 1];
	int  m_serverport;
 
	uv_mutex_t mutex_data_;//clients map mutex
private:
	uv_mutex_t mutex_sockets_;//clients map mutex
	uv_thread_t start_threadhandle_;//start thread handle
    static void StartThread(void* arg);//start thread,run until use close the server
    bool m_runing;
};
 
#endif 
 

源文件:

#include "UVSocketTransMgr.h"
 
 
CUvSocketTransMgr::CUvSocketTransMgr()
{
	loop = NULL;
	int iret = uv_mutex_init(&mutex_data_);
    if (iret) 
	{
		printf("%s: uv_mutex_init failed iret:%s\n", `ls_function` , uv_err_name(iret));
    }
    iret = uv_mutex_init(&mutex_sockets_);
    if (iret) 
	{
		printf("%s: uv_mutex_init failed iret:%s\n", `ls_function` , uv_err_name(iret));
    }
 
	m_serverip[0] = '\0';
	m_serverport = 0;
	
	m_UvSocketItemMap.clear();
	m_runing = false;
}
 
CUvSocketTransMgr::~CUvSocketTransMgr()
{
	RemoveAllUvSocketItem();
 
	uv_stop(loop);
	//pthread_exit(NULL);
	uv_thread_join(&start_threadhandle_);
	uv_loop_delete(loop);
	uv_mutex_destroy(&mutex_data_);
	uv_mutex_destroy(&mutex_sockets_);
}
 
void CUvSocketTransMgr::echo_alloc(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) 
{
	buf->base = (char *)malloc(suggested_size);
	buf->len = suggested_size;
}
 
void CUvSocketTransMgr::on_closeserver(uv_handle_t* handle) 
{
	printf("%s:%d >>>>>>>on_closeserver:%p\n", `ls_function` , `ls_line` , handle);
}
 
void CUvSocketTransMgr::on_close(uv_handle_t* handle) 
{	
	printf("%s:%d >>>>>>>on_close:%p\n", `ls_function` , `ls_line` , handle);
	free(handle);
}
 
void CUvSocketTransMgr::after_write(uv_write_t* req, int status) 
{
	write_req_t* wr;
	/* Free the read/write buffer and the request */
	wr = (write_req_t*) req;
	free(wr->buf.base);
	free(wr);
 
	if (status == 0)
		return;
 
	printf("%s: uv_write error: %s - %s\n", `ls_function` , uv_err_name(status), uv_strerror(status));
}
 
void CUvSocketTransMgr::after_read(uv_stream_t* client, ssize_t nread, const uv_buf_t* rcvbuf) 
{
	uv_stream_t *tcpserver = (uv_stream_t *)client->data;
	if (tcpserver == NULL)
	{
		printf("%s:%d tcpserver is null\n", `ls_function` , `ls_line` );
		return;
	}
	CUvSocketTransMgr *socketmgr = (CUvSocketTransMgr *)tcpserver->data;
	if (socketmgr == NULL)
	{
		printf("%s:%d socketmgr is null\n", `ls_function` , `ls_line` );
		return;
	}
	if (nread > 0)
	{
		TUvSocketItem *pSocketItem = socketmgr->FindAUvSocketItem(client->io_watcher.fd);
		if (pSocketItem == NULL)
		{
			printf("%s:%d find a uvsocketitem socket:%d is null\n", `ls_function` , `ls_line` , client->io_watcher.fd);
			return;
		}
		if (pSocketItem != NULL)
		{
			// sticky sip tcp packet
			socketmgr->sip_tcp_sticky_packet(pSocketItem, rcvbuf->base, nread, pSocketItem->srcip, pSocketItem->srcport, pSocketItem->destip, pSocketItem->destport);
		}
	}
    if (nread < 0) 
	{        
		if (nread != UV_EOF) 
		{
			printf("%s:Read error %s\n", `ls_function` , uv_err_name(nread));   
		}
		socketmgr->RemoveAUvSocketItem(client->io_watcher.fd);
		uv_close((uv_handle_t*) client, on_close);    
	}
 
	free(rcvbuf->base);
	return;
}
 
 
void CUvSocketTransMgr::on_connection(uv_stream_t* tcpserver, int status) 
{
	if (status != 0) 
	{
		printf("%s: Connect error %s\n", `ls_function` , uv_err_name(status));
	}
	
	CUvSocketTransMgr *socketmgr = (CUvSocketTransMgr *)tcpserver->data;
	if (socketmgr == NULL)
	{
		printf("%s:%d socketmgr is null\n", `ls_function` , `ls_line` );
		return;
	}
 
	uv_stream_t *client = (uv_stream_t *)malloc(sizeof(uv_tcp_t));
	if (client == NULL)
	{	
		printf("%s:%d client malloc faild\n", `ls_function` , `ls_line` );
		return;
	}
	int iret = uv_tcp_init(socketmgr->loop, (uv_tcp_t*)client);
	if (iret)
	{
		printf("%s: uv_tcp_init failed iret:%s\n", `ls_function` , uv_err_name(iret));
		return;
	}
	/* associate server with stream */
	client->data = tcpserver;
 
	iret = uv_accept(tcpserver, client);
	if (iret == 0)
	{
		struct sockaddr_in serveraddr;	
		int serverlen = sizeof(struct sockaddr);    
		char svrip[IPSTR_MAX_LEN+1] = {0};
		uv_tcp_getsockname((uv_tcp_t*)client, (struct sockaddr*)&serveraddr, &serverlen);  
		inet_ntop(AF_INET, (struct in_addr *)&serveraddr.sin_addr.s_addr, svrip, IPSTR_MAX_LEN);
		int svrport = ntohs(serveraddr.sin_port);
 
		struct sockaddr_in clientaddr;	
		int clientlen = sizeof(struct sockaddr);	
		uv_tcp_getpeername((uv_tcp_t*)client, (struct sockaddr*)&clientaddr, &clientlen); 
		char clientip[IPSTR_MAX_LEN+1] = {0};
		inet_ntop(AF_INET, (struct in_addr *)&clientaddr.sin_addr.s_addr, clientip, IPSTR_MAX_LEN);  //使用线程安全函数
		int clientport = ntohs(clientaddr.sin_port);	
		// add socketitem
		socketmgr->OpenAUvSocketItem(client->io_watcher.fd, SOCKETTYPE_TCPSERVER, svrip, svrport, clientip, clientport, client);
		iret= uv_read_start(client, echo_alloc, after_read);
		if (iret)
		{
			printf("%s: uv_read_start failed iret:%s\n", `ls_function` , uv_err_name(iret));
			return;
		}
	}
	else
	{
		socketmgr->RemoveAUvSocketItem(client->io_watcher.fd);
		//close socket
		uv_close((uv_handle_t*)client, on_close);
	}
}
 
 
bool CUvSocketTransMgr::tcp4_echo_start(const char *ip, int port) 
{
	struct sockaddr_in addr;
	int iret;
	iret = uv_ip4_addr(ip, port, &addr);
	if (iret)
	{
		printf("%s: uv_ip4_addr failed iret:%s\n", `ls_function` , uv_err_name(iret));
		return false;
	}
	
	//server = (uv_handle_t*)&tcpServer;
 
	iret = uv_tcp_init(loop, &tcpServer);
	if (iret) 
	{
		printf("%s: uv_tcp_init failed iret:%s\n", `ls_function` , uv_err_name(iret));
		return false;
	}
 
	iret = uv_tcp_bind(&tcpServer, (const struct sockaddr*) &addr, 0);
	if (iret) 
	{
		printf("%s: uv_tcp_bind failed iret:%s\n", `ls_function` , uv_err_name(iret));
		return false;
	}
 
	tcpServer.data = this;
	iret = uv_listen((uv_stream_t*)&tcpServer, UV_TCPSVR_LISTEN_MAXCONN, on_connection);
	if (iret) 
	{
		printf("%s: uv_listen failed iret:%s\n", `ls_function` , uv_err_name(iret));
		return false;
	}
 
	return true;
}
 
void CUvSocketTransMgr::on_send(uv_udp_send_t* req, int status) 
{
    if (status) 
	{
        printf("%s:Send error %s\n", `ls_function` , uv_strerror(status));
        return;
    }
	//if (status == 0)
	free(req);
}
 
void CUvSocketTransMgr::on_recv(uv_udp_t* handle, ssize_t nread, const uv_buf_t* rcvbuf, const struct sockaddr* addr, unsigned flags)
{
	if (nread > 0)
	{
		CUvSocketTransMgr *socketmgr = (CUvSocketTransMgr *)handle->data;
		if (socketmgr == NULL)
		{
			return;
		}
		struct sockaddr_in svraddr;	
		int clientlen = sizeof(struct sockaddr);
		char svrip[IPSTR_MAX_LEN + 1] = {0};
		uv_udp_getsockname(handle, (struct sockaddr*)&svraddr, &clientlen);
		inet_ntop(AF_INET, (struct in_addr *)&svraddr.sin_addr.s_addr, svrip, IPSTR_MAX_LEN);  //使用线程安全函数		
		int svrport = ntohs(svraddr.sin_port);
 
 
		struct sockaddr_in *clientaddr = (struct sockaddr_in *)addr;	
		char dstip[IPSTR_MAX_LEN+1] = {0};
		inet_ntop(AF_INET, (struct in_addr *)&clientaddr->sin_addr.s_addr, dstip, IPSTR_MAX_LEN); 
		int dstport = ntohs(clientaddr->sin_port);
 
		//static int recvtimes = 0;
		//printf("1.[%d]on_recvxxxxxxxxxxxxsocketfd:%d, handle:%p, nread:%d src<%s:%d>, dst<%s:%d>\n", ++recvtimes, handle->io_watcher.fd, handle, nread, svrip, svrport, dstip, dstport);		
	
		//回调
		uv_mutex_lock(&socketmgr->mutex_data_);
		socketmgr->RecvPacketCallBack(handle->io_watcher.fd, rcvbuf->base, nread, svrip, svrport, dstip, dstport, false);
		uv_mutex_unlock(&socketmgr->mutex_data_);
		
		//free rcvbuf
		free(rcvbuf->base);
	}
}
 
bool CUvSocketTransMgr::udp4_echo_start(const char *ip, int port) 
{
	int iret;
	
	struct sockaddr_in addr;
	iret = uv_ip4_addr(ip, port, &addr);
	if (iret)
	{
		printf("%s: uv_ip4_addr failed iret:%s\n", `ls_function` , uv_err_name(iret));
		return false;
	}
	//server = (uv_handle_t*)&udpServer;
 
	iret = uv_udp_init(loop, &udpServer);
	if (iret) 
	{
		printf("%s: uv_udp_init failed iret:%s\n", `ls_function` , uv_err_name(iret));
		return false;
	}
	//uv_udp_nodelay
	//UV_UDP_REUSEADDR
	iret = uv_udp_bind(&udpServer, (const struct sockaddr *)&addr, UV_UDP_REUSEADDR);
	if (iret) 
	{
		printf("%s: uv_udp_bind failed iret:%s\n", `ls_function` , uv_err_name(iret));
		return false;
	}
	udpServer.data = this;
	
	iret = uv_udp_recv_start(&udpServer, echo_alloc, on_recv);
	if (iret) 
	{
		printf("%s: uv_udp_recv_start failed iret:%s\n", `ls_function` , uv_err_name(iret));
		return false;
	}
	OpenAUvSocketItem(udpServer.io_watcher.fd, SOCKETTYPE_UDP, m_serverip, m_serverport, NULL, 0, &udpServer);
	return true;
}
 
//bool CUvSocketTransMgr::Start(SOCKETTYPE_E socktype, const char* ip, int port)
bool CUvSocketTransMgr::BindSocket(const char _srcip[],int _srcport,SOCKETTYPE_E type, char* _destip,int _destport)
{
	if (strlen(_srcip) <= 0 || _srcport <= 0)
	{
		return false;
	}
    
    if (loop == NULL)
	{
		loop = uv_default_loop();
    }
	if (loop == NULL)
	{
		printf("%s: uv_default_loop failed\n", `ls_function` );
		return false;
    }
		
	strncpy2(m_serverip, _srcip, IPSTR_MAX_LEN);
    m_serverport = _srcport;
 
	if (type == SOCKETTYPE_UDP)
	{
		udp4_echo_start(_srcip, _srcport);
	}
	else if (type == SOCKETTYPE_TCPSERVER)
	{
		tcp4_echo_start(_srcip, _srcport);
	}
	else
	{
		return false;
	}
	if (!m_runing)
	{
	    int iret = uv_thread_create(&start_threadhandle_, StartThread, this);//use thread to wait for start succeed.
	    if (iret) 
		{
			printf("%s: uv_thread_create failed iret:%s\n", `ls_function` , uv_err_name(iret));
	        return false;
	    }
		m_runing = true;
	}
	return true;
}
 
int CUvSocketTransMgr::MoveSocket(const char _srcip[],int _srcport,SOCKETTYPE_E type,const char* _destip,int _destport)
{
	//close udp socket
	if (type == SOCKETTYPE_UDP)
	{
		uv_close((uv_handle_t*)&udpServer, on_closeserver);
		return 0;
	}
	else if (type == SOCKETTYPE_TCPSERVER)
	{
		if (m_UvSocketItemMap.size() > 0)
		{
			uv_mutex_lock(&mutex_sockets_);
			//close all tcp client
			TUvSocketItem *pSocketItem = NULL;
			UVSOCKETMAP_IT it;
			for (it=m_UvSocketItemMap.begin(); it != m_UvSocketItemMap.end(); ++it)
			{
				pSocketItem = (TUvSocketItem *)it->second;
				if (pSocketItem != NULL)
				{
					if (pSocketItem->socktype == SOCKETTYPE_TCPSERVER)
					{
						uv_close((uv_handle_t*)pSocketItem->sockethandle, on_close);  
						if (pSocketItem->pTcpRecvData != NULL)
						{
							SAFE_DELETE(pSocketItem->pTcpRecvData);
						}
					}
				}
			}
			uv_mutex_unlock(&mutex_sockets_);
		}
		printf(">>>>>tcpServer.io_watcher.fd:%d\n", tcpServer.io_watcher.fd);
		uv_close((uv_handle_t*)&tcpServer, on_closeserver);  
		//close(tcpServer.io_watcher.fd);
	}
	return 0;	
}
 
bool CUvSocketTransMgr::run(int status)
{
    printf("server runing.\n");
    int iret = uv_run(loop, (uv_run_mode)status);
    if (iret) 
	{
		printf("%s: uv_run failed iret:%s\n", `ls_function` , uv_err_name(iret));
        return false;
    }
    return true;
}
 
void CUvSocketTransMgr::StartThread(void* arg)
{
    CUvSocketTransMgr* socketmgr = (CUvSocketTransMgr*)arg;
    socketmgr->run();
}
 
void CUvSocketTransMgr::sip_tcp_sticky_packet(TUvSocketItem * pSocketItem, const char *pData, int len, const char *pSrcIP, int SrcPort,const char* pDestIP,int DestPort)
{
	if(len > 0)
	{
		char *end_sip = NULL;
		char *cl_header = NULL;
		int cl_size = 0;
		
		if(pSocketItem->pTcpRecvData != NULL)
		{
			/* concat old data with new data */
			pSocketItem->pTcpRecvData = (char *)realloc(pSocketItem->pTcpRecvData, pSocketItem->nTcpRecvLen+len+1);
			if(NULL == pSocketItem->pTcpRecvData)
			{
				printf("realloc pTcpRecvData failed!\n");
				pSocketItem->nTcpRecvLen = 0;
				return;
			}
			strncpy(pSocketItem->pTcpRecvData+pSocketItem->nTcpRecvLen, pData, len);
			pSocketItem->pTcpRecvData[pSocketItem->nTcpRecvLen+len] = '\0';
			pSocketItem->nTcpRecvLen = pSocketItem->nTcpRecvLen + len;
		}
		if(NULL == pSocketItem->pTcpRecvData)
		{
			pSocketItem->pTcpRecvData = (char *)malloc(len+1);
			if(pSocketItem->pTcpRecvData != NULL)
			{
				memset(pSocketItem->pTcpRecvData, 0, len+1); 
			}
			strncpy(pSocketItem->pTcpRecvData, pData, len);
			pSocketItem->pTcpRecvData[len] = '\0';	
			pSocketItem->nTcpRecvLen = len;
		}
		end_sip = strstr(pSocketItem->pTcpRecvData, "\r\n\r\n");
		while(end_sip != NULL)
		{
			cl_header = mystrcasestr(pSocketItem->pTcpRecvData, "\ncontent-length ");
			if (cl_header == NULL || cl_header > end_sip)
				cl_header = mystrcasestr(pSocketItem->pTcpRecvData,"\ncontent-length:");
			if (cl_header == NULL || cl_header > end_sip)
				cl_header =mystrcasestr(pSocketItem->pTcpRecvData,"\r\nl ");
			if (cl_header == NULL || cl_header > end_sip)
				cl_header =mystrcasestr(pSocketItem->pTcpRecvData,"\r\nl:");
 
			if (cl_header != NULL && cl_header < end_sip)
				cl_header = strchr(cl_header, ':');
			if (cl_header == NULL || cl_header >= end_sip) 
			{
					/* remove data up to crlfcrlf and restart */
				memmove(pSocketItem->pTcpRecvData, end_sip+4,pSocketItem->nTcpRecvLen -(end_sip + 4 -pSocketItem->pTcpRecvData) + 1);
				pSocketItem->nTcpRecvLen = pSocketItem->nTcpRecvLen - (end_sip +4 -pSocketItem->pTcpRecvData);
 
				pSocketItem->pTcpRecvData = (char *)realloc(pSocketItem->pTcpRecvData, pSocketItem->nTcpRecvLen+1);
 
				if(pSocketItem->pTcpRecvData == NULL)
				{
					pSocketItem->nTcpRecvLen = 0;
					break;
				}
				end_sip =strstr(pSocketItem->pTcpRecvData,"\r\n\r\n");
				continue;	/* and restart from new CRLFCRLF */
			}
			/* header content-length was found before CRLFCRLF -> all headers are available */
			cl_header++;	/* after ':' char */
			cl_size = atoi(cl_header);
			if (cl_size == 0|| (cl_size >0 && (end_sip + 4 + cl_size <=pSocketItem->pTcpRecvData+pSocketItem->nTcpRecvLen)))
			{
				uv_mutex_lock(&mutex_data_);
				RecvPacketCallBack(pSocketItem->socketfd,pSocketItem->pTcpRecvData, (end_sip + 4 + cl_size -pSocketItem->pTcpRecvData),\
					pSrcIP,SrcPort,pDestIP,DestPort, true);	
				uv_mutex_unlock(&mutex_data_);
				
				if (pSocketItem->nTcpRecvLen -(end_sip + 4 + cl_size -pSocketItem->pTcpRecvData) == 0) 
				{
					end_sip = NULL;
					pSocketItem->nTcpRecvLen = 0;
					free(pSocketItem->pTcpRecvData);
					pSocketItem->pTcpRecvData = NULL;
					continue;
				}
 
				memmove(pSocketItem->pTcpRecvData,end_sip + 4 + cl_size, pSocketItem->nTcpRecvLen -(end_sip + 4 + cl_size -pSocketItem->pTcpRecvData) + 1);
				pSocketItem->nTcpRecvLen = pSocketItem->nTcpRecvLen - (end_sip +4 +cl_size -pSocketItem->pTcpRecvData);
 
				pSocketItem->pTcpRecvData = (char *)realloc(pSocketItem->pTcpRecvData,pSocketItem->nTcpRecvLen + 1);
				if (pSocketItem->pTcpRecvData == NULL)
				{
					printf("realloc  pSocketItem->pTcpRecvDatapSocketItem->pTcpRecvDatapSocketItem->pTcpRecvData!!\n");
					pSocketItem->nTcpRecvLen = 0;
					break;
				}
				end_sip =strstr(pSocketItem->pTcpRecvData,"\r\n\r\n");
				continue;	/* and restart from new CRLFCRLF */
						
			}
			/* uncomplete SIP message */
			end_sip = NULL;		
		}
		if (pSocketItem->nTcpRecvLen== 0) 
		{
			/* all data consumed are reallocation error ? */
			return;
		}
	}
	else if(len< 0)
	{
		printf("Could not read socket (%d)- close it\n",pSocketItem->socketfd);
		//DelSocketItem(pSocketItem);
		//close(pSocketItem->sock);
	}
	else if (len == 0)
	{
		printf("End of stream (read 0 byte from %s:%i)\n", pSocketItem->srcip, pSocketItem->srcport);
		//DelSocketItem(pSocketItem);
		//close(pSocketItem->sock);
	}
	else
	{
		printf("Dummy SIP message received (size=%d)\n", len);
	}
 
}
 
int CUvSocketTransMgr::SendPacket(int sock,const char *pData, int Datalen,const char *pSrcIP, int SrcPort,const char* pDestIP,int DestPort, bool bTcpSend)
{
	if (sock <= 0)
	{
		struct sockaddr_in to;    
		bzero(&to, sizeof(struct sockaddr_in) );
		to.sin_family = AF_INET;
		to.sin_addr.s_addr = inet_addr(pDestIP);
		to.sin_port = htons(DestPort);
		
		int len = sendto(udpServer.io_watcher.fd, (char*)pData, Datalen, 0, (struct sockaddr *)&to, sizeof(struct sockaddr_in) );
		if (len != Datalen)
		{
			printf("sendto <%s:%d> error: len = %d, datalen = %d! \n", pDestIP, DestPort, len, Datalen);
		}
		return len;
	}
	int iret = -1;
	TUvSocketItem *pSocketItem = FindAUvSocketItem(sock);
	if (pSocketItem != NULL)
	{
		if (pSocketItem->socktype == SOCKETTYPE_UDP)	
		{
#if 1	
			struct sockaddr_in to;
    
			bzero(&to, sizeof(struct sockaddr_in) );
			to.sin_family = AF_INET;
			to.sin_addr.s_addr = inet_addr(pDestIP);
			to.sin_port = htons(DestPort);
			
			int len = sendto(sock, (char*)pData, Datalen, 0, (struct sockaddr *)&to, sizeof(struct sockaddr_in) );
			if (len != Datalen)
			{
				printf("sendto <%s:%d> error: len = %d, datalen = %d! \n", pDestIP, DestPort, len, Datalen);
			}
			
			return len;	
#else	
			//send udp
			struct sockaddr_in addr;
			iret = uv_ip4_addr(pDestIP, DestPort, &addr);
			if (iret)
			{
				printf("%s: uv_ip4_addr failed iret:%s\n", `ls_function` , uv_err_name(iret));
				return -1;
			}
			uv_udp_send_t* req;
			req = (uv_udp_send_t *)malloc(sizeof(*req));
			if (req == NULL)
			{
				return -1;
			}
 
			uv_buf_t sndbuf = uv_buf_init((char *)pData, Datalen);	
			iret = uv_udp_send(req, &udpServer, &sndbuf, 1, (struct sockaddr*)&addr, on_send);
			if (iret)
			{
				printf("%s: uv_udp_send failed iret:%s\n", `ls_function` , uv_err_name(iret));
				return -1;
			}
#endif	
		}
		else if (pSocketItem->socktype == SOCKETTYPE_TCPSERVER)
		{
#if 1		
			int len = send(sock, pData, Datalen, 0);
			return len;
#else			
			write_req_t *wr;
			wr = (write_req_t*) malloc(sizeof *wr);
			if (wr == NULL)
			{
				printf("%s:%d write_req_t malloc failed!\n", `ls_function` , `ls_line` );
				return -1;
			}
			wr->buf.base = (char *)malloc(Datalen + 1);
			wr->buf.len = Datalen;
			memcpy(wr->buf.base, pData, Datalen);
			wr->buf.base[Datalen] = '\0';	
			//wr->buf = uv_buf_init((char *)pData, Datalen);
			iret = uv_write(&wr->req, (uv_stream_t *)pSocketItem->sockethandle, &wr->buf, 1, after_write);
			if (iret) 
			{
				printf("%s: uv_write failed iret:%s\n", `ls_function` , uv_err_name(iret));
				return -1;
			}
#endif			
		}
	}
	return iret;
}
 
TUvSocketItem *CUvSocketTransMgr::OpenAUvSocketItem(int socketfd, SOCKETTYPE_E type, char *srcip, int srcport, char *destip, int destport, void *handle)
{
	uv_mutex_lock(&mutex_sockets_);
	TUvSocketItem *pSocketItem = NULL;
	UVSOCKETMAP_IT it;
	it = m_UvSocketItemMap.find(socketfd);
	if (it != m_UvSocketItemMap.end())
	{
		pSocketItem = (TUvSocketItem *)it->second;
		pSocketItem->socketfd = socketfd;
		pSocketItem->socktype = type;
		if (srcip != NULL)
			strncpy2(pSocketItem->srcip, srcip, IPSTR_MAX_LEN); 
		pSocketItem->srcport = srcport;
		if (destip != NULL)
			strncpy2(pSocketItem->destip, destip, IPSTR_MAX_LEN);  
		pSocketItem->destport = destport;
		pSocketItem->sockethandle = handle;
		printf("%s: find a uvsocket socket:%d, socktype:%d, srcip:%s, srcport:%d, destip:%s, destport:%d, sockethandle:%p", `ls_function` , \
			socketfd, type, srcip, srcport, destip, destport, handle);	
		uv_mutex_unlock(&mutex_sockets_);
		return pSocketItem;
	}
	pSocketItem = new TUvSocketItem();
	pSocketItem->socketfd = socketfd;
	pSocketItem->socktype = type;
	if (srcip != NULL)
		strncpy2(pSocketItem->srcip, srcip, IPSTR_MAX_LEN); 
	pSocketItem->srcport = srcport;
	if (destip != NULL)
		strncpy2(pSocketItem->destip, destip, IPSTR_MAX_LEN);  
	pSocketItem->destport = destport;
	pSocketItem->sockethandle = handle;
 
	m_UvSocketItemMap.insert(UVSOCKETMAP_VT(socketfd, pSocketItem));
	printf("%s: open a uvsocket socket:%d, socktype:%d, srcip:%s, srcport:%d, destip:%s, destport:%d, sockethandle:%p", `ls_function` , \
		socketfd, type, srcip, srcport, destip, destport, handle);
	uv_mutex_unlock(&mutex_sockets_);
	return pSocketItem;
}
 
TUvSocketItem *CUvSocketTransMgr::FindAUvSocketItem(int socketfd)
{
	uv_mutex_lock(&mutex_sockets_);
	UVSOCKETMAP_IT it;
	it = m_UvSocketItemMap.find(socketfd);
	if (it != m_UvSocketItemMap.end())
	{
		uv_mutex_unlock(&mutex_sockets_);
		return (TUvSocketItem *)it->second;
	}
	uv_mutex_unlock(&mutex_sockets_);
	return NULL;
}
 
void CUvSocketTransMgr::RemoveAUvSocketItem(int socketfd)
{
	uv_mutex_lock(&mutex_sockets_);
	UVSOCKETMAP_IT it;
	it = m_UvSocketItemMap.find(socketfd);
	if (it == m_UvSocketItemMap.end())
	{
		uv_mutex_unlock(&mutex_sockets_);
		return;
	}
	TUvSocketItem *pSocketItem = it->second;
	if (pSocketItem->pTcpRecvData != NULL)
	{
		SAFE_DELETE(pSocketItem->pTcpRecvData);
	}
	SAFE_DELETE(it->second);
	m_UvSocketItemMap.erase(it);
	uv_mutex_unlock(&mutex_sockets_);
	return;
}
 
void CUvSocketTransMgr::RemoveAllUvSocketItem(void)
{
	printf(">>>>>remove all socket item:%d\n", (int)m_UvSocketItemMap.size());
	uv_mutex_lock(&mutex_sockets_);
	if (m_UvSocketItemMap.size() <= 0)
	{
		uv_mutex_unlock(&mutex_sockets_);
		return;
	}
	UVSOCKETMAP_IT it;
	for (it=m_UvSocketItemMap.begin(); it != m_UvSocketItemMap.end();)
	{
		SAFE_DELETE(it->second);
	    m_UvSocketItemMap.erase(it++);
	}
	uv_mutex_unlock(&mutex_sockets_);
	return;
}

使用方法:

启动接口,支持udp和tcpserver:

bool BindSocket(const char _srcip[], int _srcport, SOCKETTYPE_E type, char* _destip = NULL, int _destport = 0);

关闭socket:

int MoveSocket(const char _srcip[],int _srcport,SOCKETTYPE_E type,const char* _destip,int _destport);

将数据回调出上层接口:

virtual int RecvPacketCallBack(int sock,const char *pData, int len,const char *pSrcIP, int SrcPort,const char* pDestIP,int DestPort, bool bTcpRecv) = 0;

发送数据:

int SendPacket(int sock,const char *pData, int Datalen,const char *pSrcIP, int SrcPort,const char* pDestIP,int DestPort, bool bTcpSend)

国标GB2818播放效果:

国标GB28181流媒体服务解决方案4G安防摄像头直播视频流媒体平台EasyGBS的libuv库通信协议封装-支持udp和tcpserver同时使用