欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SRS 代码分析【转发流实现】

程序员文章站 2022-07-08 20:06:39
...

SRS 代码分析【转发流实现】

转载:http://lib.csdn.net/article/liveplay/50671

publish的流和play的流怎么连接呢?这个恐怕是最绕的地方了。看了一上午的代码,淹没于各种数据结构与流程之中后,俺终于发现了连接publish和play的关键连个类是

SrsSource

SrsConsumer

负责连接着连个类实例的是

SrsRtmpConn 

下面我们详细讲解连接过程


上片我们说到。在底层客户端连接上来后,会经过一系列处理,最后绕到SrsRtmpConn类的循环函数中。就是下面的函数

int SrsConnection::cycle()
{
    int ret = ERROR_SUCCESS;
    
    _srs_context->generate_id();
    id = _srs_context->get_id();
    
    ip = srs_get_peer_ip(st_netfd_fileno(stfd));
    
    ret = do_cycle();
    
    // if socket io error, set to closed.
    if (srs_is_client_gracefully_close(ret)) {
        ret = ERROR_SOCKET_CLOSED;
    }
    
    // success.
    if (ret == ERROR_SUCCESS) {
        srs_trace("client finished.");
    }
    
    // client close peer.
    if (ret == ERROR_SOCKET_CLOSED) {
        srs_warn("client disconnect peer. ret=%d", ret);
    }

    return ERROR_SUCCESS;
}

这个是SrsRtmpConn的基类SrsConnection的函数。在基类里,do_cycle()是个纯虚函数。具体实现完全是靠这子类来的。

那么rtmp类型的这个子类,到底有多么的变态呢,先看看我画的一个流程图,都没有画完。一张放不下,的截好几张

SRS 代码分析【转发流实现】

SRS 代码分析【转发流实现】


SRS 代码分析【转发流实现】


SRS 代码分析【转发流实现】


够长的,这里我还只是画到了播放的时候,发布流程还没有画。因为太复杂了。


下面开始一步一步的分析

首先看do_cycle()函数这个函数主要负责握手和连命令。并在成功后。获取流的配置信息。关键代码如下

if ((ret = rtmp->handshake()) != ERROR_SUCCESS) {
        srs_error("rtmp handshake failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("rtmp handshake success");
    
    if ((ret = rtmp->connect_app(req)) != ERROR_SUCCESS) {
        srs_error("rtmp connect vhost/app failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("rtmp connect app success");
注意这里有一个比较重要的数据结构

SrsRequest* req


这个主要是存储请求信息的,比如app turl streamid等等。

在各种分析后,进入下一个cycle,service_cycle()函数

service_cycly()函数在做了一些设置工作,设置比如chunk size。代码如下


 if ((ret = rtmp->set_window_ack_size((int)(2.5 * 1000 * 1000))) != ERROR_SUCCESS) {
        srs_error("set window acknowledgement size failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("set window acknowledgement size success");
        
    if ((ret = rtmp->set_peer_bandwidth((int)(2.5 * 1000 * 1000), 2)) != ERROR_SUCCESS) {
        srs_error("set peer bandwidth failed. ret=%d", ret);
        return ret;
    }

下面一段代码没有看明白。这个是一个补丁打上去的,说说为了做do token traverse。这个暂时先不研究了。

if (true) {
        bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
        bool edge_traverse = _srs_config->get_vhost_edge_token_traverse(req->vhost);
        if (vhost_is_edge && edge_traverse) {
            if ((ret = check_edge_token_traverse_auth()) != ERROR_SUCCESS) {
                srs_warn("token auth failed, ret=%d", ret);
                return ret;
            }
        }
    }

接着设置chunk 的大小

 int chunk_size = _srs_config->get_chunk_size(req->vhost);
    if ((ret = rtmp->set_chunk_size(chunk_size)) != ERROR_SUCCESS) {
        srs_error("set chunk_size=%d failed. ret=%d", chunk_size, ret);
        return ret;
    }

回应客户端。连接ok

 if ((ret = rtmp->response_connect_app(req, local_ip.c_str())) != ERROR_SUCCESS) {
        srs_error("response connect app failed. ret=%d", ret);
        return ret;
    }

然后连接就结束了,进入stream_service_cycle()函数,从名字上就可以看出。这个函数是开始就如流命令时代

    while (!disposed) {
        ret = stream_service_cycle();
        
        // stream service must terminated with error, never success.
        // when terminated with success, it's user required to stop.
        if (ret == ERROR_SUCCESS) {
            continue;
        }
        
        // when not system control error, fatal error, return.
        if (!srs_is_system_control_error(ret)) {
            if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
                srs_error("stream service cycle failed. ret=%d", ret);
            }
            return ret;
        }
        
        // for republish, continue service
        if (ret == ERROR_CONTROL_REPUBLISH) {
            // set timeout to a larger value, wait for encoder to republish.
            rtmp->set_send_timeout(SRS_REPUBLISH_RECV_TIMEOUT_US);
            rtmp->set_recv_timeout(SRS_REPUBLISH_SEND_TIMEOUT_US);
            
            srs_trace("control message(unpublish) accept, retry stream service.");
            continue;
        }
        
        // for "some" system control error, 
        // logical accept and retry stream service.
        if (ret == ERROR_CONTROL_RTMP_CLOSE) {
            // TODO: FIXME: use ping message to anti-death of socket.
            // @see: https://github.com/ossrs/srs/issues/39
            // set timeout to a larger value, for user paused.
            rtmp->set_recv_timeout(SRS_PAUSED_RECV_TIMEOUT_US);
            rtmp->set_send_timeout(SRS_PAUSED_SEND_TIMEOUT_US);
            
            srs_trace("control message(close) accept, retry stream service.");
            continue;
        }
        
        // for other system control message, fatal error.
        srs_error("control message(%d) reject as error. ret=%d", ret, ret);
        return ret;
    }

stream_service_cycle()函数闪亮登场

首先进行一些安全验证

f ((ret = rtmp->identify_client(res->stream_id, type, req->stream, req->duration)) != ERROR_SUCCESS) {
        if (!srs_is_client_gracefully_close(ret)) {
            srs_error("identify client failed. ret=%d", ret);
        }
        return ret;
    }
    req->strip();
    srs_trace("client identified, type=%s, stream_name=%s, duration=%.2f", 
        srs_client_type_string(type).c_str(), req->stream.c_str(), req->duration);
    
    // security check
    if ((ret = security->check(type, ip, req)) != ERROR_SUCCESS) {
        srs_error("security check failed. ret=%d", ret);
        return ret;
    }
    srs_info("security check ok");

然后进入比较有意思的环节

 SrsSource* source = SrsSource::fetch(req);
    if (!source) {
        if ((ret = SrsSource::create(req, server, server, &source)) != ERROR_SUCCESS) {
            return ret;
        }
    }
    srs_assert(source != NULL);

根据req,寻找是否有这个源,如果没有,那么久创建一个。主要creat()是个静态函数。实现代码为

int SrsSource::create(SrsRequest* r, ISrsSourceHandler* h, ISrsHlsHandler* hh, SrsSource** pps)
{
    int ret = ERROR_SUCCESS;
    
    string stream_url = r->get_stream_url();
    string vhost = r->vhost;
    
    // should always not exists for create a source.
    srs_assert (pool.find(stream_url) == pool.end());

    SrsSource* source = new SrsSource();
    if ((ret = source->initialize(r, h, hh)) != ERROR_SUCCESS) {
        srs_freep(source);
        return ret;
    }
        
    pool[stream_url] = source;
    srs_info("create new source for url=%s, vhost=%s", stream_url.c_str(), vhost.c_str());
    
    *pps = source;
    
    return ret;
}

创建一个新的source,并且放到poo中。pool是什么

 static std::map<std::string, SrsSource*> pool;

也是一个全局的静态变量,用了存储所欲的源。到此。谜底进一步解开了。

同意fetch()函数也是静态的。

 static SrsSource* fetch(SrsRequest* r);
 static SrsSource* fetch(std::string vhost, std::string app, std::string stream);
ok!在绕道循环函数看看接下来该怎么办

SrsStatistic* stat = SrsStatistic::instance();
    if ((ret = stat->on_client(_srs_context->get_id(), req, this, type)) != ERROR_SUCCESS) {
        srs_error("stat client failed. ret=%d", ret);
        return ret;
    }

这个是做统计用的,没啥。

bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
    bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
    srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]",
        req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge, 
        source->source_id(), source->source_id());
    source->set_cache(enabled_cache);
判断是否是边缘节点,是否需要gop缓冲。无他

 switch (type) {
        case SrsRtmpConnPlay: {
            srs_verbose("start to play stream %s.", req->stream.c_str());
            
            // response connection start play
            if ((ret = rtmp->start_play(res->stream_id)) != ERROR_SUCCESS) {
                srs_error("start to play stream failed. ret=%d", ret);
                return ret;
            }
            if ((ret = http_hooks_on_play()) != ERROR_SUCCESS) {
                srs_error("http hook on_play failed. ret=%d", ret);
                return ret;
            }
            
            srs_info("start to play stream %s success", req->stream.c_str());
            ret = playing(source);
            http_hooks_on_stop();
            
            return ret;
        }
        case SrsRtmpConnFMLEPublish: {
            srs_verbose("FMLE start to publish stream %s.", req->stream.c_str());
            
            if ((ret = rtmp->start_fmle_publish(res->stream_id)) != ERROR_SUCCESS) {
                srs_error("start to publish stream failed. ret=%d", ret);
                return ret;
            }
            
            return publishing(source);
        }
        case SrsRtmpConnFlashPublish: {
            srs_verbose("flash start to publish stream %s.", req->stream.c_str());
            
            if ((ret = rtmp->start_flash_publish(res->stream_id)) != ERROR_SUCCESS) {
                srs_error("flash start to publish stream failed. ret=%d", ret);
                return ret;
            }
            
            return publishing(source);
        }
        default: {
            ret = ERROR_SYSTEM_CLIENT_INVALID;
            srs_info("invalid client type=%d. ret=%d", type, ret);
            return ret;
        }
    }

大流程看。好像是根据不同走到了发布或者播放流程里。但首先。这个type是从哪里来的。怎么没有发现呢?

int SrsRtmpServer::identify_client(int stream_id, SrsRtmpConnType& type, string& stream_name, double& duration)
在这个函数里做确认类型的。rmptserver类不在我们这次分析。


我们分析下play的流程,函数名称为

int SrsRtmpConn::playing(SrsSource* source)
关键代码

 SrsConsumer* consumer = NULL;
    if ((ret = source->create_consumer(this, consumer)) != ERROR_SUCCESS) {
        srs_error("create consumer failed. ret=%d", ret);
        return ret;
    }
    SrsAutoFree(SrsConsumer, consumer);
    srs_verbose("consumer created success.");

 利用source创建一个consumer.创建代码为

int SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg)
{
    int ret = ERROR_SUCCESS;
    
    consumer = new SrsConsumer(this, conn);
    consumers.push_back(consumer);
    
    double queue_size = _srs_config->get_queue_length(_req->vhost);
    consumer->set_queue_size(queue_size);
    
    // if atc, update the sequence header to gop cache time.
    if (atc && !gop_cache->empty()) {
        if (cache_metadata) {
            cache_metadata->timestamp = gop_cache->start_time();
        }
        if (cache_sh_video) {
            cache_sh_video->timestamp = gop_cache->start_time();
        }
        if (cache_sh_audio) {
            cache_sh_audio->timestamp = gop_cache->start_time();
        }
    }
    
    // copy metadata.
    if (dm && cache_metadata && (ret = consumer->enqueue(cache_metadata, atc, jitter_algorithm)) != ERROR_SUCCESS) {
        srs_error("dispatch metadata failed. ret=%d", ret);
        return ret;
    }
    srs_info("dispatch metadata success");
    
    // copy sequence header
    // copy audio sequence first, for hls to fast parse the "right" audio codec.
    // @see https://github.com/ossrs/srs/issues/301
    if (ds && cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio, atc, jitter_algorithm)) != ERROR_SUCCESS) {
        srs_error("dispatch audio sequence header failed. ret=%d", ret);
        return ret;
    }
    srs_info("dispatch audio sequence header success");

    if (ds && cache_sh_video && (ret = consumer->enqueue(cache_sh_video, atc, jitter_algorithm)) != ERROR_SUCCESS) {
        srs_error("dispatch video sequence header failed. ret=%d", ret);
        return ret;
    }
    srs_info("dispatch video sequence header success");
    
    // copy gop cache to client.
    if (dg && (ret = gop_cache->dump(consumer, atc, jitter_algorithm)) != ERROR_SUCCESS) {
        return ret;
    }
    
    // print status.
    if (dg) {
        srs_trace("create consumer, queue_size=%.2f, jitter=%d", queue_size, jitter_algorithm);
    } else {
        srs_trace("create consumer, ignore gop cache, jitter=%d", jitter_algorithm);
    }

    // for edge, when play edge stream, check the state
    if (_srs_config->get_vhost_is_edge(_req->vhost)) {
        // notice edge to start for the first client.
        if ((ret = play_edge->on_client_play()) != ERROR_SUCCESS) {
            srs_error("notice edge start play stream failed. ret=%d", ret);
            return ret;
        }
    }
    
    return ret;
}

代码好长。主要是创建 放进数据结构中,并拷贝一些metadata进去,对于edge的处理,还没有看明白。

之后的动作

SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP);
    
    // start isolate recv thread.
    if ((ret = trd.start()) != ERROR_SUCCESS) {
        srs_error("start isolate recv thread failed. ret=%d", ret);
        return ret;
    }

什么?单独创建了一个接受线程,实现了recv never send,send never recv,据说这样效率提高了33%

在绕回去

    // delivery messages for clients playing stream.
    wakable = consumer;
    ret = do_playing(source, consumer, &trd);
    wakable = NULL;
进入下一个循环体do_playing()

在分析下一个函数之前。让我总结下缩做的工作

1)创建或者获取了一个source

2)创建一个consumer

3) 创建一个接受线程


下面开始看函数的关键代码

// setup the realtime.
    realtime = _srs_config->get_realtime_enabled(req->vhost);
    // setup the mw config.
    // when mw_sleep changed, resize the socket send buffer.
    mw_enabled = true;
    change_mw_sleep(_srs_config->get_mw_sleep_ms(req->vhost));
    // initialize the send_min_interval
    send_min_interval = _srs_config->get_send_min_interval(req->vhost);

做实时性 merge write 的设置

  while (!trd->empty()) {
            SrsCommonMessage* msg = trd->pump();
            srs_verbose("pump client message to process.");
            
            if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) {
                if (!srs_is_system_control_error(ret) && !srs_is_client_gracefully_close(ret)) {
                    srs_error("process play control message failed. ret=%d", ret);
                }
                return ret;
            }
        }
首先处理接受消息。主要是暂停的消息。

下面进入核心代码

<strong><span style="color:#ff6666;"> int count = (send_min_interval > 0)? 1 : 0;
        if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) {
            srs_error("get messages from consumer failed. ret=%d", ret);
            return ret;
        }</span></strong>
这段代码的作用就是,把消息,从consumer里,拷贝到本地的msg队列里。当然。这个拷贝是浅拷贝,只是指针过来了。

首先看msgs的定义

SrsMessageArray msgs(SRS_PERF_MW_MSGS)

这个类里有个核心变量
 SrsSharedPtrMessage** msgs;
可以看到它保存的是一个指向指针的指针。

那么dump_packets是怎么实现的呢?

int SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count)
{
    int ret =ERROR_SUCCESS;
    
    srs_assert(count >= 0);
    srs_assert(msgs->max > 0);
    
    // the count used as input to reset the max if positive.
    int max = count? srs_min(count, msgs->max) : msgs->max;
    
    // the count specifies the max acceptable count,
    // here maybe 1+, and we must set to 0 when got nothing.
    count = 0;
    
    if (should_update_source_id) {
        srs_trace("update source_id=%d[%d]", source->source_id(), source->source_id());
        should_update_source_id = false;
    }
    
    // paused, return nothing.
    if (paused) {
        return ret;
    }

    // pump msgs from queue.
    if ((ret = queue->dump_packets(max, msgs->msgs, count)) != ERROR_SUCCESS) {
        return ret;
    }
    
    return ret;
}


int SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count)
{
    int ret = ERROR_SUCCESS;
    
    int nb_msgs = (int)msgs.size();
    if (nb_msgs <= 0) {
        return ret;
    }
    
    srs_assert(max_count > 0);
    count = srs_min(max_count, nb_msgs);

    SrsSharedPtrMessage** omsgs = msgs.data();
    for (int i = 0; i < count; i++) {
        pmsgs[i] = omsgs[i];
    }
    
    SrsSharedPtrMessage* last = omsgs[count - 1];
    av_start_time = last->timestamp;
    
    if (count >= nb_msgs) {
        // the pmsgs is big enough and clear msgs at most time.
        msgs.clear();
    } else {
        // erase some vector elements may cause memory copy,
        // maybe can use more efficient vector.swap to avoid copy.
        // @remark for the pmsgs is big enough, for instance, SRS_PERF_MW_MSGS 128,
        //      the rtmp play client will get 128msgs once, so this branch rarely execute.
        msgs.erase(msgs.begin(), msgs.begin() + count);
    }
    
    return ret;
}

ok代码我就不想分析了。只是个指针拷贝。

下一个问题。consumer的数据是怎么来的呢?

看source的代码,比如音频数据

int SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)

代码里有这么一段

 // copy to all consumer
    if (!drop_for_reduce) {
        for (int i = 0; i < (int)consumers.size(); i++) {
            SrsConsumer* consumer = consumers.at(i);
            if ((ret = consumer->enqueue(msg, atc, jitter_algorithm)) != ERROR_SUCCESS) {
                srs_error("dispatch the audio failed. ret=%d", ret);
                return ret;
            }
        }
        srs_info("dispatch audio success.");
    }

到此,一个循环就结束了。
相关标签: RTMP服务器