SRS 代码分析【转发流实现】
SRS 代码分析【转发流实现】
转载:http://lib.csdn.net/article/liveplay/50671
publish的流和play的流怎么连接呢?这个恐怕是最绕的地方了。看了一上午的代码,淹没于各种数据结构与流程之中后,俺终于发现了连接publish和play的关键连个类是
SrsSource
SrsConsumer
负责连接着连个类实例的是
SrsRtmpConn
下面我们详细讲解连接过程
上片我们说到。在底层客户端连接上来后,会经过一系列处理,最后绕到SrsRtmpConn类的循环函数中。就是下面的函数
int SrsConnection::cycle()
{
int ret = ERROR_SUCCESS;
_srs_context->generate_id();
id = _srs_context->get_id();
ip = srs_get_peer_ip(st_netfd_fileno(stfd));
ret = do_cycle();
// if socket io error, set to closed.
if (srs_is_client_gracefully_close(ret)) {
ret = ERROR_SOCKET_CLOSED;
}
// success.
if (ret == ERROR_SUCCESS) {
srs_trace("client finished.");
}
// client close peer.
if (ret == ERROR_SOCKET_CLOSED) {
srs_warn("client disconnect peer. ret=%d", ret);
}
return ERROR_SUCCESS;
}
这个是SrsRtmpConn的基类SrsConnection的函数。在基类里,do_cycle()是个纯虚函数。具体实现完全是靠这子类来的。
那么rtmp类型的这个子类,到底有多么的变态呢,先看看我画的一个流程图,都没有画完。一张放不下,的截好几张
够长的,这里我还只是画到了播放的时候,发布流程还没有画。因为太复杂了。
下面开始一步一步的分析
首先看do_cycle()函数这个函数主要负责握手和连命令。并在成功后。获取流的配置信息。关键代码如下
if ((ret = rtmp->handshake()) != ERROR_SUCCESS) {
srs_error("rtmp handshake failed. ret=%d", ret);
return ret;
}
srs_verbose("rtmp handshake success");
if ((ret = rtmp->connect_app(req)) != ERROR_SUCCESS) {
srs_error("rtmp connect vhost/app failed. ret=%d", ret);
return ret;
}
srs_verbose("rtmp connect app success");
注意这里有一个比较重要的数据结构
SrsRequest* req
在各种分析后,进入下一个cycle,service_cycle()函数
service_cycly()函数在做了一些设置工作,设置比如chunk size。代码如下
if ((ret = rtmp->set_window_ack_size((int)(2.5 * 1000 * 1000))) != ERROR_SUCCESS) {
srs_error("set window acknowledgement size failed. ret=%d", ret);
return ret;
}
srs_verbose("set window acknowledgement size success");
if ((ret = rtmp->set_peer_bandwidth((int)(2.5 * 1000 * 1000), 2)) != ERROR_SUCCESS) {
srs_error("set peer bandwidth failed. ret=%d", ret);
return ret;
}
下面一段代码没有看明白。这个是一个补丁打上去的,说说为了做do token traverse。这个暂时先不研究了。
if (true) {
bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
bool edge_traverse = _srs_config->get_vhost_edge_token_traverse(req->vhost);
if (vhost_is_edge && edge_traverse) {
if ((ret = check_edge_token_traverse_auth()) != ERROR_SUCCESS) {
srs_warn("token auth failed, ret=%d", ret);
return ret;
}
}
}
接着设置chunk 的大小
int chunk_size = _srs_config->get_chunk_size(req->vhost);
if ((ret = rtmp->set_chunk_size(chunk_size)) != ERROR_SUCCESS) {
srs_error("set chunk_size=%d failed. ret=%d", chunk_size, ret);
return ret;
}
回应客户端。连接ok
if ((ret = rtmp->response_connect_app(req, local_ip.c_str())) != ERROR_SUCCESS) {
srs_error("response connect app failed. ret=%d", ret);
return ret;
}
然后连接就结束了,进入stream_service_cycle()函数,从名字上就可以看出。这个函数是开始就如流命令时代
while (!disposed) {
ret = stream_service_cycle();
// stream service must terminated with error, never success.
// when terminated with success, it's user required to stop.
if (ret == ERROR_SUCCESS) {
continue;
}
// when not system control error, fatal error, return.
if (!srs_is_system_control_error(ret)) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("stream service cycle failed. ret=%d", ret);
}
return ret;
}
// for republish, continue service
if (ret == ERROR_CONTROL_REPUBLISH) {
// set timeout to a larger value, wait for encoder to republish.
rtmp->set_send_timeout(SRS_REPUBLISH_RECV_TIMEOUT_US);
rtmp->set_recv_timeout(SRS_REPUBLISH_SEND_TIMEOUT_US);
srs_trace("control message(unpublish) accept, retry stream service.");
continue;
}
// for "some" system control error,
// logical accept and retry stream service.
if (ret == ERROR_CONTROL_RTMP_CLOSE) {
// TODO: FIXME: use ping message to anti-death of socket.
// @see: https://github.com/ossrs/srs/issues/39
// set timeout to a larger value, for user paused.
rtmp->set_recv_timeout(SRS_PAUSED_RECV_TIMEOUT_US);
rtmp->set_send_timeout(SRS_PAUSED_SEND_TIMEOUT_US);
srs_trace("control message(close) accept, retry stream service.");
continue;
}
// for other system control message, fatal error.
srs_error("control message(%d) reject as error. ret=%d", ret, ret);
return ret;
}
stream_service_cycle()函数闪亮登场
首先进行一些安全验证
f ((ret = rtmp->identify_client(res->stream_id, type, req->stream, req->duration)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("identify client failed. ret=%d", ret);
}
return ret;
}
req->strip();
srs_trace("client identified, type=%s, stream_name=%s, duration=%.2f",
srs_client_type_string(type).c_str(), req->stream.c_str(), req->duration);
// security check
if ((ret = security->check(type, ip, req)) != ERROR_SUCCESS) {
srs_error("security check failed. ret=%d", ret);
return ret;
}
srs_info("security check ok");
然后进入比较有意思的环节
SrsSource* source = SrsSource::fetch(req);
if (!source) {
if ((ret = SrsSource::create(req, server, server, &source)) != ERROR_SUCCESS) {
return ret;
}
}
srs_assert(source != NULL);
根据req,寻找是否有这个源,如果没有,那么久创建一个。主要creat()是个静态函数。实现代码为
int SrsSource::create(SrsRequest* r, ISrsSourceHandler* h, ISrsHlsHandler* hh, SrsSource** pps)
{
int ret = ERROR_SUCCESS;
string stream_url = r->get_stream_url();
string vhost = r->vhost;
// should always not exists for create a source.
srs_assert (pool.find(stream_url) == pool.end());
SrsSource* source = new SrsSource();
if ((ret = source->initialize(r, h, hh)) != ERROR_SUCCESS) {
srs_freep(source);
return ret;
}
pool[stream_url] = source;
srs_info("create new source for url=%s, vhost=%s", stream_url.c_str(), vhost.c_str());
*pps = source;
return ret;
}
创建一个新的source,并且放到poo中。pool是什么
static std::map<std::string, SrsSource*> pool;
也是一个全局的静态变量,用了存储所欲的源。到此。谜底进一步解开了。
同意fetch()函数也是静态的。
static SrsSource* fetch(SrsRequest* r);
static SrsSource* fetch(std::string vhost, std::string app, std::string stream);
ok!在绕道循环函数看看接下来该怎么办
SrsStatistic* stat = SrsStatistic::instance();
if ((ret = stat->on_client(_srs_context->get_id(), req, this, type)) != ERROR_SUCCESS) {
srs_error("stat client failed. ret=%d", ret);
return ret;
}
这个是做统计用的,没啥。
bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]",
req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge,
source->source_id(), source->source_id());
source->set_cache(enabled_cache);
判断是否是边缘节点,是否需要gop缓冲。无他
switch (type) {
case SrsRtmpConnPlay: {
srs_verbose("start to play stream %s.", req->stream.c_str());
// response connection start play
if ((ret = rtmp->start_play(res->stream_id)) != ERROR_SUCCESS) {
srs_error("start to play stream failed. ret=%d", ret);
return ret;
}
if ((ret = http_hooks_on_play()) != ERROR_SUCCESS) {
srs_error("http hook on_play failed. ret=%d", ret);
return ret;
}
srs_info("start to play stream %s success", req->stream.c_str());
ret = playing(source);
http_hooks_on_stop();
return ret;
}
case SrsRtmpConnFMLEPublish: {
srs_verbose("FMLE start to publish stream %s.", req->stream.c_str());
if ((ret = rtmp->start_fmle_publish(res->stream_id)) != ERROR_SUCCESS) {
srs_error("start to publish stream failed. ret=%d", ret);
return ret;
}
return publishing(source);
}
case SrsRtmpConnFlashPublish: {
srs_verbose("flash start to publish stream %s.", req->stream.c_str());
if ((ret = rtmp->start_flash_publish(res->stream_id)) != ERROR_SUCCESS) {
srs_error("flash start to publish stream failed. ret=%d", ret);
return ret;
}
return publishing(source);
}
default: {
ret = ERROR_SYSTEM_CLIENT_INVALID;
srs_info("invalid client type=%d. ret=%d", type, ret);
return ret;
}
}
大流程看。好像是根据不同走到了发布或者播放流程里。但首先。这个type是从哪里来的。怎么没有发现呢?
int SrsRtmpServer::identify_client(int stream_id, SrsRtmpConnType& type, string& stream_name, double& duration)
在这个函数里做确认类型的。rmptserver类不在我们这次分析。
我们分析下play的流程,函数名称为
int SrsRtmpConn::playing(SrsSource* source)
关键代码
SrsConsumer* consumer = NULL;
if ((ret = source->create_consumer(this, consumer)) != ERROR_SUCCESS) {
srs_error("create consumer failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsConsumer, consumer);
srs_verbose("consumer created success.");
利用source创建一个consumer.创建代码为
int SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg)
{
int ret = ERROR_SUCCESS;
consumer = new SrsConsumer(this, conn);
consumers.push_back(consumer);
double queue_size = _srs_config->get_queue_length(_req->vhost);
consumer->set_queue_size(queue_size);
// if atc, update the sequence header to gop cache time.
if (atc && !gop_cache->empty()) {
if (cache_metadata) {
cache_metadata->timestamp = gop_cache->start_time();
}
if (cache_sh_video) {
cache_sh_video->timestamp = gop_cache->start_time();
}
if (cache_sh_audio) {
cache_sh_audio->timestamp = gop_cache->start_time();
}
}
// copy metadata.
if (dm && cache_metadata && (ret = consumer->enqueue(cache_metadata, atc, jitter_algorithm)) != ERROR_SUCCESS) {
srs_error("dispatch metadata failed. ret=%d", ret);
return ret;
}
srs_info("dispatch metadata success");
// copy sequence header
// copy audio sequence first, for hls to fast parse the "right" audio codec.
// @see https://github.com/ossrs/srs/issues/301
if (ds && cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio, atc, jitter_algorithm)) != ERROR_SUCCESS) {
srs_error("dispatch audio sequence header failed. ret=%d", ret);
return ret;
}
srs_info("dispatch audio sequence header success");
if (ds && cache_sh_video && (ret = consumer->enqueue(cache_sh_video, atc, jitter_algorithm)) != ERROR_SUCCESS) {
srs_error("dispatch video sequence header failed. ret=%d", ret);
return ret;
}
srs_info("dispatch video sequence header success");
// copy gop cache to client.
if (dg && (ret = gop_cache->dump(consumer, atc, jitter_algorithm)) != ERROR_SUCCESS) {
return ret;
}
// print status.
if (dg) {
srs_trace("create consumer, queue_size=%.2f, jitter=%d", queue_size, jitter_algorithm);
} else {
srs_trace("create consumer, ignore gop cache, jitter=%d", jitter_algorithm);
}
// for edge, when play edge stream, check the state
if (_srs_config->get_vhost_is_edge(_req->vhost)) {
// notice edge to start for the first client.
if ((ret = play_edge->on_client_play()) != ERROR_SUCCESS) {
srs_error("notice edge start play stream failed. ret=%d", ret);
return ret;
}
}
return ret;
}
代码好长。主要是创建 放进数据结构中,并拷贝一些metadata进去,对于edge的处理,还没有看明白。
之后的动作
SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP);
// start isolate recv thread.
if ((ret = trd.start()) != ERROR_SUCCESS) {
srs_error("start isolate recv thread failed. ret=%d", ret);
return ret;
}
什么?单独创建了一个接受线程,实现了recv never send,send never recv,据说这样效率提高了33%
在绕回去
// delivery messages for clients playing stream.
wakable = consumer;
ret = do_playing(source, consumer, &trd);
wakable = NULL;
进入下一个循环体do_playing()
在分析下一个函数之前。让我总结下缩做的工作
1)创建或者获取了一个source
2)创建一个consumer
3) 创建一个接受线程
下面开始看函数的关键代码
// setup the realtime.
realtime = _srs_config->get_realtime_enabled(req->vhost);
// setup the mw config.
// when mw_sleep changed, resize the socket send buffer.
mw_enabled = true;
change_mw_sleep(_srs_config->get_mw_sleep_ms(req->vhost));
// initialize the send_min_interval
send_min_interval = _srs_config->get_send_min_interval(req->vhost);
做实时性 merge write 的设置
while (!trd->empty()) {
SrsCommonMessage* msg = trd->pump();
srs_verbose("pump client message to process.");
if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) {
if (!srs_is_system_control_error(ret) && !srs_is_client_gracefully_close(ret)) {
srs_error("process play control message failed. ret=%d", ret);
}
return ret;
}
}
首先处理接受消息。主要是暂停的消息。
下面进入核心代码
<strong><span style="color:#ff6666;"> int count = (send_min_interval > 0)? 1 : 0;
if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) {
srs_error("get messages from consumer failed. ret=%d", ret);
return ret;
}</span></strong>
这段代码的作用就是,把消息,从consumer里,拷贝到本地的msg队列里。当然。这个拷贝是浅拷贝,只是指针过来了。
首先看msgs的定义
SrsMessageArray msgs(SRS_PERF_MW_MSGS)
这个类里有个核心变量
SrsSharedPtrMessage** msgs;
可以看到它保存的是一个指向指针的指针。
那么dump_packets是怎么实现的呢?
int SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count)
{
int ret =ERROR_SUCCESS;
srs_assert(count >= 0);
srs_assert(msgs->max > 0);
// the count used as input to reset the max if positive.
int max = count? srs_min(count, msgs->max) : msgs->max;
// the count specifies the max acceptable count,
// here maybe 1+, and we must set to 0 when got nothing.
count = 0;
if (should_update_source_id) {
srs_trace("update source_id=%d[%d]", source->source_id(), source->source_id());
should_update_source_id = false;
}
// paused, return nothing.
if (paused) {
return ret;
}
// pump msgs from queue.
if ((ret = queue->dump_packets(max, msgs->msgs, count)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
int SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count)
{
int ret = ERROR_SUCCESS;
int nb_msgs = (int)msgs.size();
if (nb_msgs <= 0) {
return ret;
}
srs_assert(max_count > 0);
count = srs_min(max_count, nb_msgs);
SrsSharedPtrMessage** omsgs = msgs.data();
for (int i = 0; i < count; i++) {
pmsgs[i] = omsgs[i];
}
SrsSharedPtrMessage* last = omsgs[count - 1];
av_start_time = last->timestamp;
if (count >= nb_msgs) {
// the pmsgs is big enough and clear msgs at most time.
msgs.clear();
} else {
// erase some vector elements may cause memory copy,
// maybe can use more efficient vector.swap to avoid copy.
// @remark for the pmsgs is big enough, for instance, SRS_PERF_MW_MSGS 128,
// the rtmp play client will get 128msgs once, so this branch rarely execute.
msgs.erase(msgs.begin(), msgs.begin() + count);
}
return ret;
}
ok代码我就不想分析了。只是个指针拷贝。
下一个问题。consumer的数据是怎么来的呢?
看source的代码,比如音频数据
int SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
代码里有这么一段
// copy to all consumer
if (!drop_for_reduce) {
for (int i = 0; i < (int)consumers.size(); i++) {
SrsConsumer* consumer = consumers.at(i);
if ((ret = consumer->enqueue(msg, atc, jitter_algorithm)) != ERROR_SUCCESS) {
srs_error("dispatch the audio failed. ret=%d", ret);
return ret;
}
}
srs_info("dispatch audio success.");
}
到此,一个循环就结束了。
上一篇: 最冷三九天 中老年人最易冷出5种病