欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

live555学习笔记【6】---客户端(一)

程序员文章站 2022-07-01 10:55:55
...

live555学习笔记【6】---客户端(一)

之前已经阅读了live555服务器的部分代码,其中也讲解了服务器对客户端各个RTSP命令的处理过程,下面我们来看看客户端是如何发送这些命令。testProgs中的OpenRTSP是典型的RTSPClient示例,所以分析它吧。
main()函数在playCommon.cpp文件中。main()的流程比较简单,跟服务端差别不大:建立任务计划对象--建立环境对象--处理用户输入的参数(RTSP地址)--创建RTSPClient实例--发出第一个RTSP请求(可能是OPTIONS也可能是DESCRIBE)--进入Loop。

RTSP的TCP连接是在发送第一个RTSP请求时才建立的,在RTSPClient的那几个发请求的函数sendXXXXXXCommand()中最终都调用sendRequest(),sendRequest()中会跟据情况建立起TCP连接。在建立连接时马上向任务计划中加入处理从这个TCP接收数据的socket handler:RTSPClient::incomingDataHandler()。
下面就是发送RTSP请求,OPTIONS就不必看了,从请求DESCRIBE开始:

void getSDPDescription(RTSPClient::responseHandler* afterFunc)
{
	ourRTSPClient->sendDescribeCommand(afterFunc, ourAuthenticator);
}
unsigned RTSPClient::sendDescribeCommand(responseHandler* responseHandler,
		Authenticator* authenticator)
{
	if (authenticator != NULL)
		fCurrentAuthenticator = *authenticator;
	return sendRequest(new RequestRecord(++fCSeq, "DESCRIBE", responseHandler));
}

参数responseHandler是调用者提供的回调函数,用于在处理完请求的回应后再调用之。并且在这个回调函数中会发出下一个请求--所有的请求都是这样依次发出的。使用回调函数的原因主要是因为socket的发送与接收不是同步进行的。类RequestRecord就代表一个请求,它不但保存了RTSP请求相关的信息,而且保存了请求完成后的回调函数--即responseHandler。有些请求发出时还没建立tcp连接,不能立即发送,则加入fRequestsAwaitingConnection队列;有些发出后要等待Server端的回应,就加入fRequestsAwaitingResponse队列,当收到回应后再从队列中把它取出。
下面我们就先来看看RTSPClient::sendRequest()里面的代码,其实无非是建立起RTSP请求字符串然后用TCP socket发送之。

unsigned RTSPClient::sendRequest(RequestRecord* request) {
  char* cmd = NULL;
  do {
    Boolean connectionIsPending = False;
    //判断客户端的请求是否需要等待
    if (!fRequestsAwaitingConnection.isEmpty()) {
      // A connection is currently pending (with at least one enqueued request).  Enqueue this request also:
      connectionIsPending = True;
    } 
    //是否需要新打开一个连接
    else if (fInputSocketNum < 0) { // we need to open a connection
      //这里面做了很多事情:解析地址->创建套接字,连接服务器->将socket加入fHandlers,循环获取状态并调用回调函数进行事件处理
      int connectResult = openConnection();
      if (connectResult < 0) break; // an error occurred
      else if (connectResult == 0) {
	// A connection is pending
        connectionIsPending = True;
      } // else the connection succeeded.  Continue sending the command.
    }
    if (connectionIsPending) {
      fRequestsAwaitingConnection.enqueue(request);
      return request->cseq();
    }

    // If requested (and we're not already doing it, or have done it), set up the special protocol for tunneling RTSP-over-HTTP:
    if (fTunnelOverHTTPPortNum != 0 && strcmp(request->commandName(), "GET") != 0 && fOutputSocketNum == fInputSocketNum) {
      if (!setupHTTPTunneling1()) break;
      fRequestsAwaitingHTTPTunneling.enqueue(request);
      return request->cseq();
    }

    // Construct and send the command:

    // First, construct command-specific headers that we need:

    char* cmdURL = fBaseURL; // by default
    Boolean cmdURLWasAllocated = False;

    char const* protocolStr = "RTSP/1.0"; // by default

    char* extraHeaders = (char*)""; // by default
    Boolean extraHeadersWereAllocated = False; 

    char* contentLengthHeader = (char*)""; // by default
    Boolean contentLengthHeaderWasAllocated = False;

    if (!setRequestFields(request,
			  cmdURL, cmdURLWasAllocated,
			  protocolStr,
			  extraHeaders, extraHeadersWereAllocated)) {
      break;
    }

    char const* contentStr = request->contentStr(); // by default
    if (contentStr == NULL) contentStr = "";
    unsigned contentStrLen = strlen(contentStr);
    if (contentStrLen > 0) {
      char const* contentLengthHeaderFmt =
	"Content-Length: %d\r\n";
      unsigned contentLengthHeaderSize = strlen(contentLengthHeaderFmt)
	+ 20 /* max int len */;
      contentLengthHeader = new char[contentLengthHeaderSize];
      sprintf(contentLengthHeader, contentLengthHeaderFmt, contentStrLen);
      contentLengthHeaderWasAllocated = True;
    }
    //组包,发送RTSP命令
    char* authenticatorStr = createAuthenticatorString(request->commandName(), fBaseURL);

    char const* const cmdFmt =
      "%s %s %s\r\n"
      "CSeq: %d\r\n"
      "%s"
      "%s"
      "%s"
      "%s"
      "\r\n"
      "%s";
    unsigned cmdSize = strlen(cmdFmt)
      + strlen(request->commandName()) + strlen(cmdURL) + strlen(protocolStr)
      + 20 /* max int len */
      + strlen(authenticatorStr)
      + fUserAgentHeaderStrLen
      + strlen(extraHeaders)
      + strlen(contentLengthHeader)
      + contentStrLen;
    cmd = new char[cmdSize];
    sprintf(cmd, cmdFmt,
	    request->commandName(), cmdURL, protocolStr,
	    request->cseq(),
	    authenticatorStr,
	    fUserAgentHeaderStr,
            extraHeaders,
	    contentLengthHeader,
	    contentStr);
    delete[] authenticatorStr;
    if (cmdURLWasAllocated) delete[] cmdURL;
    if (extraHeadersWereAllocated) delete[] extraHeaders;
    if (contentLengthHeaderWasAllocated) delete[] contentLengthHeader;

    if (fVerbosityLevel >= 1) envir() << "Sending request: " << cmd << "\n";

    if (fTunnelOverHTTPPortNum != 0 && strcmp(request->commandName(), "GET") != 0 && strcmp(request->commandName(), "POST") != 0) {
      // When we're tunneling RTSP-over-HTTP, we Base-64-encode the request before we send it.
      // (However, we don't do this for the HTTP "GET" and "POST" commands that we use to set up the tunnel.)
      char* origCmd = cmd;
      cmd = base64Encode(origCmd, strlen(cmd));
      if (fVerbosityLevel >= 1) envir() << "\tThe request was base-64 encoded to: " << cmd << "\n\n";
      delete[] origCmd;
    }

    if (send(fOutputSocketNum, cmd, strlen(cmd), 0) < 0) {
      char const* errFmt = "%s send() failed: ";
      unsigned const errLength = strlen(errFmt) + strlen(request->commandName());
      char* err = new char[errLength];
      sprintf(err, errFmt, request->commandName());
      envir().setResultErrMsg(err);
      delete[] err;
      break;
    }

    // The command send succeeded, so enqueue the request record, so that its response (when it comes) can be handled.
    // However, note that we do not expect a response to a POST command with RTSP-over-HTTP, so don't enqueue that.
    int cseq = request->cseq();

    if (fTunnelOverHTTPPortNum == 0 || strcmp(request->commandName(), "POST") != 0) {
      fRequestsAwaitingResponse.enqueue(request);
    } else {
      delete request;
    }

    delete[] cmd;
    return cseq;
  } while (0);

  // An error occurred, so call the response handler immediately (indicating the error):
  delete[] cmd;
  handleRequestError(request);
  delete request;
  return 0;
}

接下来我们来看一下收到DESCRIBE的回应后如何处理它。理论上是跟据媒体信息建立起MediaSession了,看看是不是这样:

void continueAfterDESCRIBE(RTSPClient*, int resultCode, char* resultString) {
  if (resultCode != 0) {
    *env << "Failed to get a SDP description for the URL \"" << streamURL << "\": " << resultString << "\n";
    delete[] resultString;
    shutdown();
  }

  char* sdpDescription = resultString;
  *env << "Opened URL \"" << streamURL << "\", returning a SDP description:\n" << sdpDescription << "\n";

  // Create a media session object from this SDP description:
  //根据服务器返回的SDP信息,创建一个会话
  session = MediaSession::createNew(*env, sdpDescription);
  delete[] sdpDescription;
  if (session == NULL) {
    *env << "Failed to create a MediaSession object from the SDP description: " << env->getResultMsg() << "\n";
    shutdown();
  } else if (!session->hasSubsessions()) {
    *env << "This session has no media subsessions (i.e., no \"m=\" lines)\n";
    shutdown();
  }

  // Then, setup the "RTPSource"s for the session:
  MediaSubsessionIterator iter(*session);
  MediaSubsession *subsession;
  Boolean madeProgress = False;
  char const* singleMediumToTest = singleMedium;
  while ((subsession = iter.next()) != NULL) {
    // If we've asked to receive only a single medium, then check this now:
    if (singleMediumToTest != NULL) {
      if (strcmp(subsession->mediumName(), singleMediumToTest) != 0) {
		  *env << "Ignoring \"" << subsession->mediumName()
			  << "/" << subsession->codecName()
			  << "\" subsession, because we've asked to receive a single " << singleMedium
			  << " session only\n";
	continue;
      } else {
	// Receive this subsession only
	singleMediumToTest = "xxxxx";
	    // this hack ensures that we get only 1 subsession of this type
      }
    }

    if (desiredPortNum != 0) {
      subsession->setClientPortNum(desiredPortNum);
      desiredPortNum += 2;
    }

    if (createReceivers) {
      //初始化subsession,在其中会建立RTP/RTCP socket以及RTPSource。
      if (!subsession->initiate(simpleRTPoffsetArg)) {
	*env << "Unable to create receiver for \"" << subsession->mediumName()
	     << "/" << subsession->codecName()
	     << "\" subsession: " << env->getResultMsg() << "\n";
      } else {
	*env << "Created receiver for \"" << subsession->mediumName()
	     << "/" << subsession->codecName() << "\" subsession (";
	if (subsession->rtcpIsMuxed()) {
	  *env << "client port " << subsession->clientPortNum();
	} else {
	  *env << "client ports " << subsession->clientPortNum()
	       << "-" << subsession->clientPortNum()+1;
	}
	*env << ")\n";
	madeProgress = True;
	
	if (subsession->rtpSource() != NULL) {
	  // Because we're saving the incoming data, rather than playing
	  // it in real time, allow an especially large time threshold
	  // (1 second) for reordering misordered incoming packets:
	  unsigned const thresh = 1000000; // 1 second
	  subsession->rtpSource()->setPacketReorderingThresholdTime(thresh);
	  
	  // Set the RTP source's OS socket buffer size as appropriate - either if we were explicitly asked (using -B),
	  // or if the desired FileSink buffer size happens to be larger than the current OS socket buffer size.
	  // (The latter case is a heuristic, on the assumption that if the user asked for a large FileSink buffer size,
	  // then the input data rate may be large enough to justify increasing the OS socket buffer size also.)
	  int socketNum = subsession->rtpSource()->RTPgs()->socketNum();
	  unsigned curBufferSize = getReceiveBufferSize(*env, socketNum);
	  if (socketInputBufferSize > 0 || fileSinkBufferSize > curBufferSize) {
	    unsigned newBufferSize = socketInputBufferSize > 0 ? socketInputBufferSize : fileSinkBufferSize;
	    newBufferSize = setReceiveBufferTo(*env, socketNum, newBufferSize);
	    if (socketInputBufferSize > 0) { // The user explicitly asked for the new socket buffer size; announce it:
	      *env << "Changed socket receive buffer size for the \""
		   << subsession->mediumName()
		   << "/" << subsession->codecName()
		   << "\" subsession from "
		   << curBufferSize << " to "
		   << newBufferSize << " bytes\n";
	    }
	  }
	}
      }
    } else {
      if (subsession->clientPortNum() == 0) {
	*env << "No client port was specified for the \""
	     << subsession->mediumName()
	     << "/" << subsession->codecName()
	     << "\" subsession.  (Try adding the \"-p <portNum>\" option.)\n";
      } else {
		madeProgress = True;
      }
    }
  }
  if (!madeProgress) shutdown();

  //下一步就是发送SETUP请求了。需要为每个Track分别发送一次。
  // Perform additional 'setup' on each subsession, before playing them:
  setupStreams();
}

上面的代码看起来非常长,实际上,去除一些打印,去除一些没必要的代码段之后是很短的,所以,希望大家还是可以把看阅读完。

getOptions(continueAfterOPTIONS);
      |---sendOptionsCommand(afterFunc, ourAuthenticator);//1
.........|---sendRequest(new..RequestRecord(++fCSeq,"OPTIONS",responseHandler));//2
.................|---openConnection();//3
.........................|---connectToServer()//4
..................................|---connect()//5
..................................|---taskScheduler().setBackgroundHandling(connectionHandler);//6
.........................|---taskScheduler().setBackgroundHandling(incomingDataHandler);//7
.................|---send(fOutputSocketNum, cmd, strlen(cmd), 0)//8
.................|---fRequestsAwaitingResponse.enqueue(request);//9

2:RTSP第一步,发送OPTIONS包。并将continueAfterOPTIONS回调函数一同传入,通过这些参数构建一个RequestRecord对象,这些传入的参数通过这个对象管理
3:第一次使用RTSP协议首先要建立一个TCP连接, fInputSocketNum = fOutputSocketNum = setupStreamSocket(envir(), 0);先建立socket,然后调用4 connect连接。因为这个套接字并不是阻塞的,所以connect之后可能握手还没有完成就返回了。所以这是要添加connectionHandler到任务中,完成connect操作,一次就好。7:建立完成之后将接受消息处理函数添加到后台任务中。incomin–Data–Handler
8:前期准备都做好了然后发送OPTIONS命令,等待sever返回,
9:将开始生成的RequestRecord对象添加到等待队列中,其中包含了“OPTIONS”和处理函数“continueAfterXXXXX”

注:在发送OPTIONS时建立好了TCP连接,并添加好了处理函数
然后程序会进入singleEvent()循环,检查任务。
当检测到该socket有可读信息时,调用incomingDataHandler
在handleResponseBytes中先解析返回的信息,然后取出 fRequestsAwaitingResponse队列里的RequestRecord对象,上面已经说了,该对象保存了接受完OPTIONS之后的处理函数continueAfterOPTIONS。
然后进入 foundRequest->handler()
 

为了方便大家理解,我这里再把整个客户端的RTSP的DESCRIBE命令处理的流程整理一下:

1、客户端主动组包发送DESCRIBE命令,第一次命令发送会创建socket套接字,并设置数据处理回调函数

2、收到服务器响应后,通过回调函数的方式,客户端使用回调函数对响应报文做相应的处理,包括:根据SDP信息创建会话、为每个子会话创建RTP/RTCP套接字以及fRTPSource

3、为每个子会话,发送SETUP命令

这里先休息一下,我们继续往下讲剩余的几个RTSP命令!

live555学习笔记【6】---客户端(一)

 

相关标签: RTSPClient RTSP