欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Reliable Multicast Programming(PGM)协议

程序员文章站 2022-03-20 22:06:52
Reliable Multicast Programming (PGM)实际通用可靠多播协议,在某种程度上保证多播的可靠性。是IP上层协议,和TCP还有UDP同级,工作在传输层。 在组播传输视频项目中,发现在网络较差的时候,组播传输视频性能下降迅猛,组播的视频几乎到了无法直视的地步,已经不是马赛克什 ......

reliable multicast programming (pgm)实际通用可靠多播协议,在某种程度上保证多播的可靠性。是ip上层协议,和tcp还有udp同级,工作在传输层。

在组播传输视频项目中,发现在网络较差的时候,组播传输视频性能下降迅猛,组播的视频几乎到了无法直视的地步,已经不是马赛克什么的问题了,简直就是一张臭抹布。

但是上面的要求是让接收端达到1080p 16fps的播放效果,此时组播接收端的实时网络速率只有50kb/s左右,这种情况下要从软件上处理的话(因为路由器不好换),需要让组播的丢包率降低才行,但是使用iperf测了下当时网络的丢包率,能丢到80%,丢到他姥姥家?

但是此时带宽利用率却很低,赶紧换成udp单播试了一下,速度能上去,也不怎么花屏了,不清楚是不是确认机制的问题。
但是总不能说把组播换成单播,当接入的接收端变多的时候,不清楚单播效果会不会也变差。

这个时候发现了pgm,“可靠”多播协议,有不少基于pgm实现的库,打算先用windows上的写个demo出来。

想要使用pgm需要先在网络适配器上安装协议,安装完成后会在属性中出现可靠多播协议
Reliable Multicast Programming(PGM)协议

然后就是开发了,官网文档提供的demo很棒,copy下来几乎就能跑起来。但是除了官网文档,相关资料就比较少了,头文件我还找了半天,环境上坑不少,记录一下

wsrm.h头文件

首先是windows sdk,我试了一下如果是8.1的sdk的话,是找不到wsrm.h头文件的,我有装10.0.17134.0,8.1还有10.0.15063.0三个版本的windows sdk,用everything找了一下这个头文件,得到了下面图示结果
Reliable Multicast Programming(PGM)协议

8.1应该是没有,剩下两个版本均可以使用,更新的版本应该也行。
Reliable Multicast Programming(PGM)协议

vs2017以上的话在visual studio installer里面修改多装个sdk就行了

传输速度

pgm本身也有发送窗口的概念,如果使用默认设置,窗口小,发送速度非常慢,每秒最多只有70kb左右,这时候需要设置socket选项
ratekbitspersec的单位是kilobits/s,是一个上限

    rm_send_window send_window;
    send_window.windowsizeinbytes = 8000 * 1000;
    send_window.windowsizeinmsecs = 1;
    send_window.ratekbitspersec = (send_window.windowsizeinbytes/send_window.windowsizeinmsecs)*8;


    int rc = setsockopt(s, ipproto_rm, rm_rate_window_size, (char *)&send_window, sizeof(send_window));
    if (rc == socket_error)
    {

        cout << "setsockopt(): rm_rate_window_size failed with error code " << wsagetlasterror() << endl;

    }

rm_send_window结构体就这么三个成员,第一个是每秒速度了,第二个是发送窗口的大小,第三个是窗口大小毫秒,其中windows会强制让
ratekbitspersec/8 = windowsizeinbytes * windowsizeinmsecs

ps:windowszieinmsecs的值需要调整,当windowsizeinbytes=8000并且windowszieinmsecs=1时,发送端较大概率会阻塞,原因未知,可能是发包速度过快导致

真的可靠么?

在文章一开始的时候可靠被加上了双引号,为的是表明这个协议并不是想象中的那么可靠。

发送窗口大小有限,如果需要恢复重传的数据在发送窗口之外了,那数据就是不可恢复的,一般当发送端速率过快接收端接收速度明显跟不上时,就会出现不可恢复现象。一旦出现不可恢复数据时,windows就会让接收端的连接重置,此时就不能继续接收。

源码

因为找不到对应的头文件让我着实头疼了很久,相关文档少,还不告诉我头文件是什么,这太不爽了,就好比让你看着门后面有啥,就是不给你钥匙。

pgm分为server端和client端,功能是发送文件,根据编写的

下面是server端代码

#define _winsock_deprecated_no_warnings
#define win32_lean_and_mean

#include <iostream>
#include<winsock2.h>
#include<ws2tcpip.h>    //ip_mreqͷ
#include <wsrm.h>
#include <stdio.h>

using namespace std;
#pragma comment(lib,"ws2_32.lib")

int main() {
    wsadata wsadata;
    word sockversion = makeword(2, 2);
    if (wsastartup(sockversion, &wsadata) != 0)
        return 0;
    
    file *fp;
    fopen_s(&fp, "test.webm", "rb+");

    socket        s;
    sockaddr_in   salocal, sasession;
    int           dwsessionport;

    s = socket(af_inet, sock_rdm, ipproto_rm);

    salocal.sin_family = af_inet;
    salocal.sin_port = htons(0);    // port is ignored here
    salocal.sin_addr.s_addr = htonl(inaddr_any);

    bind(s, (sockaddr *)&salocal, sizeof(salocal));

    //
    // set all relevant sender socket options here
    //

    //
    // now, connect <entity type="hellip"/>
    // setting the connection port (dwsessionport) has relevance, and
    // can be used to multiplex multiple sessions to the same
    // multicast group address over different ports
    //
    dwsessionport = 1234;
    sasession.sin_family = af_inet;
    sasession.sin_port = htons(dwsessionport);
    sasession.sin_addr.s_addr = inet_addr("224.4.5.6");

    rm_send_window send_window;
    send_window.windowsizeinbytes = 8000;
    send_window.windowsizeinmsecs = 1;
    send_window.ratekbitspersec = (send_window.windowsizeinbytes/send_window.windowsizeinmsecs)*8;

    int rc = setsockopt(s, ipproto_rm, rm_rate_window_size, (char *)&send_window, sizeof(send_window));
    if (rc == socket_error)
    {

        cout << "setsockopt(): rm_rate_window_size failed with error code " << wsagetlasterror() << endl;

    }
    connect(s, (sockaddr *)&sasession, sizeof(sasession));

    //
    // we're now ready to send data!
    //
    char psendbuffer[1400];

    sockaddr_in serveraddr;
    int iaddrlen = sizeof(serveraddr);




    while (1) {
        if (feof(fp))
            break;
        memset(psendbuffer, 0, 1400);

        int data_size = fread(psendbuffer, 1, 1400, fp);
        
        long        error;

        error = sendto(s, psendbuffer, data_size, 0, (sockaddr*)&serveraddr,iaddrlen);

        if (error == socket_error)
        {
            fprintf(stderr, "send() failed: error = %d\n",
                wsagetlasterror());
        }
    }

    wsacleanup();
    return 0;
}

下面是client端代码

#include <iostream>
#include<winsock2.h>
#include<ws2tcpip.h>    //ip_mreqͷ
#include <wsrm.h>
#include <stdio.h>

using namespace std;
#pragma comment(lib,"ws2_32.lib")

int main() {
    wsadata wsadata;
    word sockversion = makeword(2, 2);
    if (wsastartup(sockversion, &wsadata) != 0)
        return 0;


    socket        s,
        sclient;
    sockaddr_in   salocal,
        sasession;
    int           sasessionsz, dwsessionport;

    file * fp;
    fopen_s(&fp, "aaatest.webm", "wb+");

    s = socket(af_inet, sock_rdm, ipproto_rm);

    //
    // the bind port (dwsessionport) specified should match that
    // which the sender specified in the connect call
    //
    dwsessionport = 1234;
    salocal.sin_family = af_inet;
    salocal.sin_port = htons(dwsessionport);
    salocal.sin_addr.s_addr = inet_addr("224.4.5.6");
    int receive_buf_size = 65536 * 10;
    if (setsockopt(s, sol_socket, so_rcvbuf, (char*)&receive_buf_size, sizeof(receive_buf_size)) < 0)
    {
        std::cout << "setsockopt():so_rcvbuf failed with error code" << wsagetlasterror() << std::endl;
    }


    bind(s, (sockaddr *)&salocal, sizeof(salocal));

    //
    // set all relevant receiver socket options here
    //

    listen(s, 10);

    sasessionsz = sizeof(sasession);
    sclient = accept(s, (sockaddr *)&sasession, &sasessionsz);

    if (setsockopt(sclient, sol_socket, so_rcvbuf, (char*)&receive_buf_size, sizeof(receive_buf_size)) < 0)
    {
        std::cout << "setsockopt():so_rcvbuf failed with error code" << wsagetlasterror() << std::endl;
    }
    //
    // accept will return the client socket and we are now ready
    // to receive data on the new socket!
    //
    long bytesread;
    char ptestbuffer[1400];

    sockaddr_in clientaddr;
    int iaddrlen = sizeof(clientaddr);

    while (1)
    {
        memset(ptestbuffer, 0, 1400);
        cout << "start" << endl;
        bytesread = recvfrom(sclient, ptestbuffer, 1400, 0, (sockaddr*)&clientaddr, &iaddrlen);
        cout << "end" << endl;
        if (bytesread == 0)
        {
            fprintf(stdout, "session was terminated\n");
        }
        else if (bytesread == -1)
        {
            std::cout << "no data?!" << std::endl;
        }
        if (bytesread > 0)
        {
            fwrite(ptestbuffer, 1, bytesread, fp);
            std::cout << bytesread << std::endl;
        }
    }
    fclose(fp);
    wsacleanup();
    return 0;
}