live555支持单播和组播,咱们先分析单播的流媒体服务端,后面分析组播的流媒体服务端。服务器
1、单播的流媒体服务端:session
// Create the RTSP server: RTSPServer* rtspServer = NULL; // Normal case: Streaming from a built-in RTSP server: rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL); if (rtspServer == NULL) { *env << "Failed to create RTSP server: " << env->getResultMsg() << "\n"; exit(1); } *env << "...done initializing \n"; if( streamingMode == STREAMING_UNICAST ) { ServerMediaSession* sms = ServerMediaSession::createNew(*env, H264StreamName[video_type], H264StreamName[video_type], streamDescription, streamingMode == STREAMING_MULTICAST_SSM); sms->addSubsession(WISH264VideoServerMediaSubsession::createNew(sms->envir(), *H264InputDevice[video_type], H264VideoBitrate)); sms->addSubsession(WISPCMAudioServerMediaSubsession::createNew(sms->envir(), *H264InputDevice[video_type])); rtspServer->addServerMediaSession(sms); char *url = rtspServer->rtspURL(sms); *env << "Play this stream using the URL:\t" << url << "\n"; delete[] url; }
// Begin the LIVE555 event loop:
env->taskScheduler().doEventLoop(&watchVariable); // does not returnapp
咱们一步一步分析:less
1> rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL);dom
RTSPServer* RTSPServer::createNew(UsageEnvironment& env, Port ourPort, UserAuthenticationDatabase* authDatabase, unsigned reclamationTestSeconds) { int ourSocket = -1; do { int ourSocket = setUpOurSocket(env, ourPort); if (ourSocket == -1) break; return new RTSPServer(env, ourSocket, ourPort, authDatabase, reclamationTestSeconds); } while (0); if (ourSocket != -1) ::closeSocket(ourSocket); return NULL; }
此函数首先建立一个rtsp协议的socket,而且监听rtspServerPortNum端口,建立RTSPServer类的实例。下面咱们看下RTSPServer的构造函数:socket
RTSPServer::RTSPServer(UsageEnvironment& env, int ourSocket, Port ourPort, UserAuthenticationDatabase* authDatabase, unsigned reclamationTestSeconds) : Medium(env), fServerSocket(ourSocket), fServerPort(ourPort), fAuthDB(authDatabase), fReclamationTestSeconds(reclamationTestSeconds), fServerMediaSessions(HashTable::create(STRING_HASH_KEYS)), fSessionIdCounter(0) { #ifdef USE_SIGNALS // Ignore the SIGPIPE signal, so that clients on the same host that are killed // don't also kill us: signal(SIGPIPE, SIG_IGN); #endif // Arrange to handle connections from others: env.taskScheduler().turnOnBackgroundReadHandling(fServerSocket, (TaskScheduler::BackgroundHandlerProc*)&incomingConnectionHandler, this); }
RTSPServer构造函数,初始化fServerMediaSessions为建立的HashTable,初始化fServerSocket为咱们前面建立的tcp socket,fServerPort为咱们监听的端口rtspServerPortNum,而且向taskScheduler注册fServerSocket的任务函数incomingConnectionHandler,这个任务函数主要监听是否有新的客服端链接accept,若是有新的客服端接入,建立RTSPClientSession的实例。tcp
RTSPClientSession要提供什么功能呢?能够想象:须要监听客户端的rtsp请求并回应它,须要在DESCRIBE请求中返回所请求的流的信息,须要在SETUP请求中创建起RTP会话,须要在TEARDOWN请求中关闭RTP会话,等等...ide
RTSPServer::RTSPClientSession::RTSPClientSession(RTSPServer& ourServer, unsigned sessionId, int clientSocket, struct sockaddr_in clientAddr) : fOurServer(ourServer), fOurSessionId(sessionId), fOurServerMediaSession(NULL), fClientSocket(clientSocket), fClientAddr(clientAddr), fLivenessCheckTask(NULL), fIsMulticast(False), fSessionIsActive(True), fStreamAfterSETUP(False), fTCPStreamIdCount(0), fNumStreamStates(0), fStreamStates(NULL) { // Arrange to handle incoming requests: resetRequestBuffer(); envir().taskScheduler().turnOnBackgroundReadHandling(fClientSocket,(TaskScheduler::BackgroundHandlerProc*)&incomingRequestHandler, this); noteLiveness(); }
上面这个函数是RTSPClientSession的构造函数,初始化sessionId为++fSessionIdCounter,初始化fClientSocket为accept建立的socket(clientSocket),初始化fClientAddr为accept接收的客服端地址,也向taskScheduler注册了fClientSocket的认为函数incomingRequestHandler。函数
incomingRequestHandler会调用incomingRequestHandler1,incomingRequestHandler1函数定义以下:oop
void RTSPServer::RTSPClientSession::incomingRequestHandler1() { noteLiveness(); struct sockaddr_in dummy; // 'from' address, meaningless in this case Boolean endOfMsg = False; unsigned char* ptr = &fRequestBuffer[fRequestBytesAlreadySeen]; int bytesRead = readSocket(envir(), fClientSocket, ptr, fRequestBufferBytesLeft, dummy); if (bytesRead <= 0 || (unsigned)bytesRead >= fRequestBufferBytesLeft) { // Either the client socket has died, or the request was too big for us. // Terminate this connection: #ifdef DEBUG fprintf(stderr, "RTSPClientSession[%p]::incomingRequestHandler1() read %d bytes (of %d); terminating connection!\n", this, bytesRead, fRequestBufferBytesLeft); #endif delete this; return; } #ifdef DEBUG ptr[bytesRead] = '\0'; fprintf(stderr, "RTSPClientSession[%p]::incomingRequestHandler1() read %d bytes:%s\n", this, bytesRead, ptr); #endif // Look for the end of the message: <CR><LF><CR><LF> unsigned char *tmpPtr = ptr; if (fRequestBytesAlreadySeen > 0) --tmpPtr; // in case the last read ended with a <CR> while (tmpPtr < &ptr[bytesRead-1]) { if (*tmpPtr == '\r' && *(tmpPtr+1) == '\n') { if (tmpPtr - fLastCRLF == 2) { // This is it: endOfMsg = 1; break; } fLastCRLF = tmpPtr; } ++tmpPtr; } fRequestBufferBytesLeft -= bytesRead; fRequestBytesAlreadySeen += bytesRead; if (!endOfMsg) return; // subsequent reads will be needed to complete the request // Parse the request string into command name and 'CSeq', // then handle the command: fRequestBuffer[fRequestBytesAlreadySeen] = '\0'; char cmdName[RTSP_PARAM_STRING_MAX]; char urlPreSuffix[RTSP_PARAM_STRING_MAX]; char urlSuffix[RTSP_PARAM_STRING_MAX]; char cseq[RTSP_PARAM_STRING_MAX]; if (!parseRTSPRequestString((char*)fRequestBuffer, fRequestBytesAlreadySeen, cmdName, sizeof cmdName, urlPreSuffix, sizeof urlPreSuffix, urlSuffix, sizeof urlSuffix, cseq, sizeof cseq)) { #ifdef DEBUG fprintf(stderr, "parseRTSPRequestString() failed!\n"); #endif handleCmd_bad(cseq); } else { #ifdef DEBUG fprintf(stderr, "parseRTSPRequestString() returned cmdName \"%s\", urlPreSuffix \"%s\", urlSuffix \"%s\"\n", cmdName, urlPreSuffix, urlSuffix); #endif if (strcmp(cmdName, "OPTIONS") == 0) { handleCmd_OPTIONS(cseq); } else if (strcmp(cmdName, "DESCRIBE") == 0) { printf("incomingRequestHandler1 ~~~~~~~~~~~~~~\n"); handleCmd_DESCRIBE(cseq, urlSuffix, (char const*)fRequestBuffer); } else if (strcmp(cmdName, "SETUP") == 0) { handleCmd_SETUP(cseq, urlPreSuffix, urlSuffix, (char const*)fRequestBuffer); } else if (strcmp(cmdName, "TEARDOWN") == 0 || strcmp(cmdName, "PLAY") == 0 || strcmp(cmdName, "PAUSE") == 0 || strcmp(cmdName, "GET_PARAMETER") == 0) { handleCmd_withinSession(cmdName, urlPreSuffix, urlSuffix, cseq, (char const*)fRequestBuffer); } else { handleCmd_notSupported(cseq); } } #ifdef DEBUG fprintf(stderr, "sending response: %s", fResponseBuffer); #endif send(fClientSocket, (char const*)fResponseBuffer, strlen((char*)fResponseBuffer), 0); if (strcmp(cmdName, "SETUP") == 0 && fStreamAfterSETUP) { // The client has asked for streaming to commence now, rather than after a // subsequent "PLAY" command. So, simulate the effect of a "PLAY" command: handleCmd_withinSession("PLAY", urlPreSuffix, urlSuffix, cseq, (char const*)fRequestBuffer); } resetRequestBuffer(); // to prepare for any subsequent request if (!fSessionIsActive) delete this; }
此函数,咱们能够看到rtsp的协议的各个命令的接收处理和应答。
2> ServerMediaSession* sms = ServerMediaSession::createNew(... ...)
建立ServerMediaSession类的实例,初始化fStreamName为"h264_ch1",fInfoSDPString为"h264_ch1",fDescriptionSDPString为"RTSP/RTP stream from NETRA",fMiscSDPLines为null,fCreationTime获取的时间,fIsSSM为false。
3> sms->addSubsession(WISH264VideoServerMediaSubsession::createNew(... ...);
WISH264VideoServerMediaSubsession::createNew():这个函数的主要目的是建立OnDemandServerMediaSubsession类的实例,这个类在前面已经分析,是单播时候必须建立的,初始化fWISInput为*H264InputDevice[video_type]。
sms->addSubsession() 是将WISH264VideoServerMediaSubsession类的实例加入到fSubsessionsTail链表首节点中。
4> sms->addSubsession(WISPCMAudioServerMediaSubsession::createNew(... ...);
WISPCMAudioServerMediaSubsession::createNew():这个函数的主要目的是建立OnDemandServerMediaSubsession类的实例,这个类在前面已经分析,是单播时候必须建立的,初始化fWISInput为*H264InputDevice[video_type]。
sms->addSubsession() 是将WISPCMAudioServerMediaSubsession类的实例加入到fSubsessionsTail->fNext中。
5> rtspServer->addServerMediaSession(sms)
将rtspServer加入到fServerMediaSessions的哈希表中。
6> env->taskScheduler().doEventLoop(&watchVariable);
这个doEventLoop在前面已经分析过,主要处理socket任务和延迟任务。
2、组播的流媒体服务器:
// Create the RTSP server: RTSPServer* rtspServer = NULL; // Normal case: Streaming from a built-in RTSP server: rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL); if (rtspServer == NULL) { *env << "Failed to create RTSP server: " << env->getResultMsg() << "\n"; exit(1); } *env << "...done initializing \n"; if( streamingMode == STREAMING_UNICAST ) { ... ... } else { if (streamingMode == STREAMING_MULTICAST_SSM) { if (multicastAddress == 0) multicastAddress = chooseRandomIPv4SSMAddress(*env); } else if (multicastAddress != 0) { streamingMode = STREAMING_MULTICAST_ASM; } struct in_addr dest; dest.s_addr = multicastAddress; const unsigned char ttl = 255; // For RTCP: const unsigned maxCNAMElen = 100; unsigned char CNAME[maxCNAMElen + 1]; gethostname((char *) CNAME, maxCNAMElen); CNAME[maxCNAMElen] = '\0'; // just in case ServerMediaSession* sms; sms = ServerMediaSession::createNew(*env, H264StreamName[video_type], H264StreamName[video_type], streamDescription,streamingMode == STREAMING_MULTICAST_SSM); /* VIDEO Channel initial */ if(1) { // Create 'groupsocks' for RTP and RTCP: const Port rtpPortVideo(videoRTPPortNum); const Port rtcpPortVideo(videoRTPPortNum+1); rtpGroupsockVideo = new Groupsock(*env, dest, rtpPortVideo, ttl); rtcpGroupsockVideo = new Groupsock(*env, dest, rtcpPortVideo, ttl); if (streamingMode == STREAMING_MULTICAST_SSM) { rtpGroupsockVideo->multicastSendOnly(); rtcpGroupsockVideo->multicastSendOnly(); } setVideoRTPSinkBufferSize(); sinkVideo = H264VideoRTPSink::createNew(*env, rtpGroupsockVideo,96, 0x42, "h264"); // Create (and start) a 'RTCP instance' for this RTP sink: unsigned totalSessionBandwidthVideo = (Mpeg4VideoBitrate+500)/1000; // in kbps; for RTCP b/w share rtcpVideo = RTCPInstance::createNew(*env, rtcpGroupsockVideo, totalSessionBandwidthVideo, CNAME, sinkVideo, NULL /* we're a server */ , streamingMode == STREAMING_MULTICAST_SSM); // Note: This starts RTCP running automatically sms->addSubsession(PassiveServerMediaSubsession::createNew(*sinkVideo, rtcpVideo)); sourceVideo = H264VideoStreamFramer::createNew(*env, H264InputDevice[video_type]->videoSource());
// Start streaming: sinkVideo->startPlaying(*sourceVideo, NULL, NULL); } /* AUDIO Channel initial */ if(1) { // there's a separate RTP stream for audio // Create 'groupsocks' for RTP and RTCP: const Port rtpPortAudio(audioRTPPortNum); const Port rtcpPortAudio(audioRTPPortNum+1); rtpGroupsockAudio = new Groupsock(*env, dest, rtpPortAudio, ttl); rtcpGroupsockAudio = new Groupsock(*env, dest, rtcpPortAudio, ttl); if (streamingMode == STREAMING_MULTICAST_SSM) { rtpGroupsockAudio->multicastSendOnly(); rtcpGroupsockAudio->multicastSendOnly(); } if( audioSamplingFrequency == 16000 ) sinkAudio = SimpleRTPSink::createNew(*env, rtpGroupsockAudio, 96, audioSamplingFrequency, "audio", "PCMU", 1); else sinkAudio = SimpleRTPSink::createNew(*env, rtpGroupsockAudio, 0, audioSamplingFrequency, "audio", "PCMU", 1); // Create (and start) a 'RTCP instance' for this RTP sink: unsigned totalSessionBandwidthAudio = (audioOutputBitrate+500)/1000; // in kbps; for RTCP b/w share rtcpAudio = RTCPInstance::createNew(*env, rtcpGroupsockAudio, totalSessionBandwidthAudio, CNAME, sinkAudio, NULL /* we're a server */, streamingMode == STREAMING_MULTICAST_SSM); // Note: This starts RTCP running automatically sms->addSubsession(PassiveServerMediaSubsession::createNew(*sinkAudio, rtcpAudio)); sourceAudio = H264InputDevice[video_type]->audioSource(); // Start streaming: sinkAudio->startPlaying(*sourceAudio, NULL, NULL); } rtspServer->addServerMediaSession(sms); { struct in_addr dest; dest.s_addr = multicastAddress; char *url = rtspServer->rtspURL(sms); //char *url2 = inet_ntoa(dest); *env << "Mulicast Play this stream using the URL:\n\t" << url << "\n"; //*env << "2 Mulicast addr:\n\t" << url2 << "\n"; delete[] url; } } // Begin the LIVE555 event loop: env->taskScheduler().doEventLoop(&watchVariable); // does not return
1> rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL);
同前面单播的分析同样。
2> sms = ServerMediaSession::createNew(... ...)
同前面单播的分析同样。
3> 视频
1. 建立视频rtp、rtcp的Groupsock类的实例,实现rtp和rtcp的udp通讯socket。这里应该了解下ASM和SSM。
2. 建立RTPSink类的实例,实现视频数据的RTP打包传输。
3. 建立RTCPInstance类的实例,实现RTCP打包传输。
4. 建立PassiveServerMediaSubsession类的实例,并加入到fSubsessionsTail链表中的首节点。
5. 建立FramedSource类的实例,实现一帧视频数据的获取。
5. 开始发送RTP和RTCP数据到组播地址。
4> 音频
1. 建立音频rtp、rtcp的Groupsock类的实例,实现rtp和rtcp的udp通讯socket。这里应该了解下ASM和SSM。
2. 建立RTPSink类的实例,实现音频数据的RTP打包传输。
3. 建立RTCPInstance类的实例,实现RTCP打包传输。
4. 建立PassiveServerMediaSubsession类的实例,并加入到fSubsessionsTail链表中的下一个节点。
5. 建立FramedSource类的实例,实现一帧音频数据的获取。
5. 开始发送RTP和RTCP数据到组播地址。
5> rtspServer->addServerMediaSession(sms)
同前面单播的分析同样。
6> env->taskScheduler().doEventLoop(&watchVariable)
同前面单播的分析同样。
3、单播和组播的区别
1> 建立socket的时候,组播一开始就建立了,而单播的则是根据收到的“SETUP”命令建立相应的socket。
2> startPlaying的时候,组播一开始就发送数据到组播地址,而单播则是根据收到的“PLAY”命令开始startPlaying。
4、startPlaying分析
首先分析组播:
sinkVideo->startPlaying()实现不在H264VideoRTPSink类中,也不在RTPSink类中,而是在MediaSink类中实现:
Boolean MediaSink::startPlaying(MediaSource& source, afterPlayingFunc* afterFunc, void* afterClientData) { // Make sure we're not already being played: if (fSource != NULL) { envir().setResultMsg("This sink is already being played"); return False; } // Make sure our source is compatible: if (!sourceIsCompatibleWithUs(source)) { envir().setResultMsg("MediaSink::startPlaying(): source is not compatible!"); return False; } fSource = (FramedSource*)&source; fAfterFunc = afterFunc; fAfterClientData = afterClientData; return continuePlaying(); }
这里发现调用了continuePlaying()函数,那这个函数在哪里实现的呢?由于sinkVideo是经过 H264VideoRTPSink::createNew()实现,返回的H264VideoRTPSink类的实例,所以咱们能够断定这个continuePlaying()在H264VideoRTPSink类实现。
Boolean H264VideoRTPSink::continuePlaying() { // First, check whether we have a 'fragmenter' class set up yet. // If not, create it now: if (fOurFragmenter == NULL) { fOurFragmenter = new H264FUAFragmenter(envir(), fSource, OutPacketBuffer::maxSize, ourMaxPacketSize() - 12/*RTP hdr size*/); fSource = fOurFragmenter; } //printf("function=%s line=%d\n",__func__,__LINE__); // Then call the parent class's implementation: return MultiFramedRTPSink::continuePlaying(); }
看到这里咱们发现调用的是MultiFramedRTPSink类的成员函数continuePlaying,看下这个函数的实现:
Boolean MultiFramedRTPSink::continuePlaying() { // Send the first packet. // (This will also schedule any future sends.) buildAndSendPacket(True); return True; }
这里咱们发现了buildAndSendPacket(),这个函数实现:
void MultiFramedRTPSink::buildAndSendPacket(Boolean isFirstPacket) { //此函数中主要是准备rtp包的头,为一些须要跟据实际数据改变的字段留出位置。 fIsFirstPacket = isFirstPacket; // Set up the RTP header: unsigned rtpHdr = 0x80000000; // RTP version 2; marker ('M') bit not set (by default; it can be set later) rtpHdr |= (fRTPPayloadType << 16); rtpHdr |= fSeqNo; // sequence number fOutBuf->enqueueWord(rtpHdr);//向包中加入一个字 // Note where the RTP timestamp will go. // (We can't fill this in until we start packing payload frames.) fTimestampPosition = fOutBuf->curPacketSize(); fOutBuf->skipBytes(4); // leave a hole for the timestamp 在缓冲中空出时间戳的位置 fOutBuf->enqueueWord(SSRC()); // Allow for a special, payload-format-specific header following the // RTP header: fSpecialHeaderPosition = fOutBuf->curPacketSize(); fSpecialHeaderSize = specialHeaderSize(); fOutBuf->skipBytes(fSpecialHeaderSize); // Begin packing as many (complete) frames into the packet as we can: fTotalFrameSpecificHeaderSizes = 0; fNoFramesLeft = False; fNumFramesUsedSoFar = 0; // 一个包中已打入的帧数。 //头准备好了,再打包帧数据 packFrame(); }
继续看packFrame():
void MultiFramedRTPSink::packFrame() { // First, see if we have an overflow frame that was too big for the last pkt if (fOutBuf->haveOverflowData()) { //若是有帧数据,则使用之。OverflowData是指上次打包时剩下的帧数据,由于一个包可能容纳不了一个帧。 // Use this frame before reading a new one from the source unsigned frameSize = fOutBuf->overflowDataSize(); struct timeval presentationTime = fOutBuf->overflowPresentationTime(); unsigned durationInMicroseconds =fOutBuf->overflowDurationInMicroseconds(); fOutBuf->useOverflowData(); afterGettingFrame1(frameSize, 0, presentationTime,durationInMicroseconds); } else { //一点帧数据都没有,跟source要吧。 // Normal case: we need to read a new frame from the source if (fSource == NULL) return; //更新缓冲中的一些位置 fCurFrameSpecificHeaderPosition = fOutBuf->curPacketSize(); fCurFrameSpecificHeaderSize = frameSpecificHeaderSize(); fOutBuf->skipBytes(fCurFrameSpecificHeaderSize); fTotalFrameSpecificHeaderSizes += fCurFrameSpecificHeaderSize; //从source获取下一帧 fSource->getNextFrame(fOutBuf->curPtr(),//新数据存放开始的位置 fOutBuf->totalBytesAvailable(),//缓冲中空余的空间大小 afterGettingFrame, //由于可能source中的读数据函数会被放在任务调度中,因此把获取帧后应调用的函数传给source this, ourHandleClosure, //这个是source结束时(好比文件读完了)要调用的函数。 this); } }
fSource定义在MediaSink类中,在这个类中startPlaying()函数中,给fSource赋值为传入的参数sourceVideo,sourceVideo实现getNextFrame()函数在FramedSource中,这是一个虚函数:
void FramedSource::getNextFrame(unsigned char* to, unsigned maxSize, afterGettingFunc* afterGettingFunc, void* afterGettingClientData, onCloseFunc* onCloseFunc, void* onCloseClientData) { // Make sure we're not already being read: if (fIsCurrentlyAwaitingData) { envir() << "FramedSource[" << this << "]::getNextFrame(): attempting to read more than once at the same time!\n"; exit(1); } fTo = to; fMaxSize = maxSize; fNumTruncatedBytes = 0; // by default; could be changed by doGetNextFrame() fDurationInMicroseconds = 0; // by default; could be changed by doGetNextFrame() fAfterGettingFunc = afterGettingFunc; fAfterGettingClientData = afterGettingClientData; fOnCloseFunc = onCloseFunc; fOnCloseClientData = onCloseClientData; fIsCurrentlyAwaitingData = True; doGetNextFrame(); }
sourceVideo经过实现H264VideoStreamFramer::createNew()实例化,发现doGetNextFrame()函数实如今H264VideoStreamFramer类中:
void H264VideoStreamFramer::doGetNextFrame() { //fParser->registerReadInterest(fTo, fMaxSize); //continueReadProcessing(); fInputSource->getNextFrame(fTo, fMaxSize, afterGettingFrame, this, FramedSource::handleClosure, this); }
这fInputSource在H264VideoStreamFramer的基类StreamParser中被初始化为传入的参数H264InputDevice[video_type]->videoSource(),VideoOpenFileSource类继承OpenFileSource类,所以这个doGetNextFrame再一次FramedSource类中的getNextFrame()函数,此次getNextFrame函数中调用的doGetNextFrame()函数则是在OpenFileSource类实现的:
void OpenFileSource::incomingDataHandler1() { int ret; if (!isCurrentlyAwaitingData()) return; // we're not ready for the data yet ret = readFromFile(); if (ret < 0) { handleClosure(this); fprintf(stderr,"In Grab Image, the source stops being readable!!!!\n"); } else if (ret == 0) { if( uSecsToDelay >= uSecsToDelayMax ) { uSecsToDelay = uSecsToDelayMax; }else{ uSecsToDelay *= 2; } nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecsToDelay, (TaskFunc*)incomingDataHandler, this); } else { nextTask() = envir().taskScheduler().scheduleDelayedTask(0, (TaskFunc*)afterGetting, this); } }
获取一帧数据后,执行延迟队列中的afterGetting()函数,此函数实现父类FramedSource中:
void FramedSource::afterGetting(FramedSource* source) { source->fIsCurrentlyAwaitingData = False; // indicates that we can be read again // Note that this needs to be done here, in case the "fAfterFunc" // called below tries to read another frame (which it usually will) if (source->fAfterGettingFunc != NULL) { (*(source->fAfterGettingFunc))(source->fAfterGettingClientData, source->fFrameSize, source->fNumTruncatedBytes, source->fPresentationTime, source->fDurationInMicroseconds); } }
fAfterGettingFunc函数指针在getNextFrame()函数被赋值,在MultiFramedRTPSink::packFrame() 函数中,被赋值MultiFramedRTPSink::afterGettingFrame():
void MultiFramedRTPSink::afterGettingFrame(void* clientData, unsigned numBytesRead, unsigned numTruncatedBytes, struct timeval presentationTime, unsigned durationInMicroseconds) { MultiFramedRTPSink* sink = (MultiFramedRTPSink*)clientData; sink->afterGettingFrame1(numBytesRead, numTruncatedBytes, presentationTime, durationInMicroseconds); }
继续看afterGettingFrame1实现:
void MultiFramedRTPSink::afterGettingFrame1( unsigned frameSize, unsigned numTruncatedBytes, struct timeval presentationTime, unsigned durationInMicroseconds) { if (fIsFirstPacket) { // Record the fact that we're starting to play now: gettimeofday(&fNextSendTime, NULL); } //若是给予一帧的缓冲不够大,就会发生截断一帧数据的现象。但也只能提示一下用户 if (numTruncatedBytes > 0) { unsigned const bufferSize = fOutBuf->totalBytesAvailable(); envir() << "MultiFramedRTPSink::afterGettingFrame1(): The input frame data was too large for our buffer size (" << bufferSize << "). " << numTruncatedBytes << " bytes of trailing data was dropped! Correct this by increasing \"OutPacketBuffer::maxSize\" to at least " << OutPacketBuffer::maxSize + numTruncatedBytes << ", *before* creating this 'RTPSink'. (Current value is " << OutPacketBuffer::maxSize << ".)\n"; } unsigned curFragmentationOffset = fCurFragmentationOffset; unsigned numFrameBytesToUse = frameSize; unsigned overflowBytes = 0; //若是包只已经打入帧数据了,而且不能再向这个包中加数据了,则把新得到的帧数据保存下来。 // If we have already packed one or more frames into this packet, // check whether this new frame is eligible to be packed after them. // (This is independent of whether the packet has enough room for this // new frame; that check comes later.) if (fNumFramesUsedSoFar > 0) { //若是包中已有了一个帧,而且不容许再打入新的帧了,则只记录下新的帧。 if ((fPreviousFrameEndedFragmentation && !allowOtherFramesAfterLastFragment()) || !frameCanAppearAfterPacketStart(fOutBuf->curPtr(), frameSize)) { // Save away this frame for next time: numFrameBytesToUse = 0; fOutBuf->setOverflowData(fOutBuf->curPacketSize(), frameSize, presentationTime, durationInMicroseconds); } } //表示当前打入的是不是上一个帧的最后一块数据。 fPreviousFrameEndedFragmentation = False; //下面是计算获取的帧中有多少数据能够打到当前包中,剩下的数据就做为overflow数据保存下来。 if (numFrameBytesToUse > 0) { // Check whether this frame overflows the packet if (fOutBuf->wouldOverflow(frameSize)) { // Don't use this frame now; instead, save it as overflow data, and // send it in the next packet instead. However, if the frame is too // big to fit in a packet by itself, then we need to fragment it (and // use some of it in this packet, if the payload format permits this.) if (isTooBigForAPacket(frameSize) && (fNumFramesUsedSoFar == 0 || allowFragmentationAfterStart())) { // We need to fragment this frame, and use some of it now: overflowBytes = computeOverflowForNewFrame(frameSize); numFrameBytesToUse -= overflowBytes; fCurFragmentationOffset += numFrameBytesToUse; } else { // We don't use any of this frame now: overflowBytes = frameSize; numFrameBytesToUse = 0; } fOutBuf->setOverflowData(fOutBuf->curPacketSize() + numFrameBytesToUse, overflowBytes, presentationTime, durationInMicroseconds); } else if (fCurFragmentationOffset > 0) { // This is the last fragment of a frame that was fragmented over // more than one packet. Do any special handling for this case: fCurFragmentationOffset = 0; fPreviousFrameEndedFragmentation = True; } } if (numFrameBytesToUse == 0 && frameSize > 0) { //若是包中有数据而且没有新数据了,则发送之。(这种状况好像很难发生啊!) // Send our packet now, because we have filled it up: sendPacketIfNecessary(); } else { //须要向包中打入数据。 // Use this frame in our outgoing packet: unsigned char* frameStart = fOutBuf->curPtr(); fOutBuf->increment(numFrameBytesToUse); // do this now, in case "doSpecialFrameHandling()" calls "setFramePadding()" to append padding bytes // Here's where any payload format specific processing gets done: doSpecialFrameHandling(curFragmentationOffset, frameStart, numFrameBytesToUse, presentationTime, overflowBytes); ++fNumFramesUsedSoFar; // Update the time at which the next packet should be sent, based // on the duration of the frame that we just packed into it. // However, if this frame has overflow data remaining, then don't // count its duration yet. if (overflowBytes == 0) { fNextSendTime.tv_usec += durationInMicroseconds; fNextSendTime.tv_sec += fNextSendTime.tv_usec / 1000000; fNextSendTime.tv_usec %= 1000000; } //若是须要,就发出包,不然继续打入数据。 // Send our packet now if (i) it's already at our preferred size, or // (ii) (heuristic) another frame of the same size as the one we just // read would overflow the packet, or // (iii) it contains the last fragment of a fragmented frame, and we // don't allow anything else to follow this or // (iv) one frame per packet is allowed: if (fOutBuf->isPreferredSize() || fOutBuf->wouldOverflow(numFrameBytesToUse) || (fPreviousFrameEndedFragmentation && !allowOtherFramesAfterLastFragment()) || !frameCanAppearAfterPacketStart( fOutBuf->curPtr() - frameSize, frameSize)) { // The packet is ready to be sent now sendPacketIfNecessary(); } else { // There's room for more frames; try getting another: packFrame(); } } }
看一下发送数据的函数:
void MultiFramedRTPSink::sendPacketIfNecessary() { //发送包 if (fNumFramesUsedSoFar > 0) { // Send the packet: #ifdef TEST_LOSS if ((our_random()%10) != 0) // simulate 10% packet loss ##### #endif if (!fRTPInterface.sendPacket(fOutBuf->packet(),fOutBuf->curPacketSize())) { // if failure handler has been specified, call it if (fOnSendErrorFunc != NULL) (*fOnSendErrorFunc)(fOnSendErrorData); } ++fPacketCount; fTotalOctetCount += fOutBuf->curPacketSize(); fOctetCount += fOutBuf->curPacketSize() - rtpHeaderSize - fSpecialHeaderSize - fTotalFrameSpecificHeaderSizes; ++fSeqNo; // for next time } //若是还有剩余数据,则调整缓冲区 if (fOutBuf->haveOverflowData() && fOutBuf->totalBytesAvailable() > fOutBuf->totalBufferSize() / 2) { // Efficiency hack: Reset the packet start pointer to just in front of // the overflow data (allowing for the RTP header and special headers), // so that we probably don't have to "memmove()" the overflow data // into place when building the next packet: unsigned newPacketStart = fOutBuf->curPacketSize()- (rtpHeaderSize + fSpecialHeaderSize + frameSpecificHeaderSize()); fOutBuf->adjustPacketStart(newPacketStart); } else { // Normal case: Reset the packet start pointer back to the start: fOutBuf->resetPacketStart(); } fOutBuf->resetOffset(); fNumFramesUsedSoFar = 0; if (fNoFramesLeft) { //若是再没有数据了,则结束之 // We're done: onSourceClosure(this); } else { //若是还有数据,则在下一次须要发送的时间再次打包发送。 // We have more frames left to send. Figure out when the next frame // is due to start playing, then make sure that we wait this long before // sending the next packet. struct timeval timeNow; gettimeofday(&timeNow, NULL); int secsDiff = fNextSendTime.tv_sec - timeNow.tv_sec; int64_t uSecondsToGo = secsDiff * 1000000 + (fNextSendTime.tv_usec - timeNow.tv_usec); if (uSecondsToGo < 0 || secsDiff < 0) { // sanity check: Make sure that the time-to-delay is non-negative: uSecondsToGo = 0; } // Delay this amount of time: nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecondsToGo, (TaskFunc*) sendNext, this); } }
当一帧数据发送完,在doEventLoop()函数执行任务函数sendNext(),继续发送一包,进行下一个循环。音频数据的发送也是如此。
总结一下调用过程(参考牛搞大神):
单播数据发送:
单播的时候,只有收到客服端的“PLAY”的命令时,才开始发送数据,在RTSPClientSession类中handleCmd_PLAY()函数中调用
void RTSPServer::RTSPClientSession ::handleCmd_PLAY(ServerMediaSubsession* subsession, char const* cseq, char const* fullRequestStr) { ... ... fStreamStates[i].subsession->startStream(fOurSessionId, fStreamStates[i].streamToken, (TaskFunc*)noteClientLiveness, this, rtpSeqNum, rtpTimestamp); ... ... }
startStream()函数定义在OnDemandServerMediaSubsession类中:
void OnDemandServerMediaSubsession::startStream(unsigned clientSessionId, void* streamToken, TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData, unsigned short& rtpSeqNum, unsigned& rtpTimestamp)
{ StreamState* streamState = (StreamState*)streamToken; Destinations* destinations = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId)); if (streamState != NULL) { streamState->startPlaying(destinations, rtcpRRHandler, rtcpRRHandlerClientData); if (streamState->rtpSink() != NULL) { rtpSeqNum = streamState->rtpSink()->currentSeqNo(); rtpTimestamp = streamState->rtpSink()->presetNextTimestamp(); } } }
startPlaying函数实如今StreamState类中:
void StreamState::startPlaying(Destinations* dests, TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData) { if (dests == NULL) return; if (!fAreCurrentlyPlaying && fMediaSource != NULL) { if (fRTPSink != NULL) { fRTPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this); fAreCurrentlyPlaying = True; } else if (fUDPSink != NULL) { fUDPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this); fAreCurrentlyPlaying = True; } } if (fRTCPInstance == NULL && fRTPSink != NULL) { // Create (and start) a 'RTCP instance' for this RTP sink: fRTCPInstance = RTCPInstance::createNew(fRTPSink->envir(), fRTCPgs, fTotalBW, (unsigned char*)fMaster.fCNAME, fRTPSink, NULL /* we're a server */); // Note: This starts RTCP running automatically } if (dests->isTCP) { // Change RTP and RTCP to use the TCP socket instead of UDP: if (fRTPSink != NULL) { fRTPSink->addStreamSocket(dests->tcpSocketNum, dests->rtpChannelId); } if (fRTCPInstance != NULL) { fRTCPInstance->addStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId); fRTCPInstance->setSpecificRRHandler(dests->tcpSocketNum, dests->rtcpChannelId, rtcpRRHandler, rtcpRRHandlerClientData); } } else { // Tell the RTP and RTCP 'groupsocks' about this destination // (in case they don't already have it): if (fRTPgs != NULL) fRTPgs->addDestination(dests->addr, dests->rtpPort); if (fRTCPgs != NULL) fRTCPgs->addDestination(dests->addr, dests->rtcpPort); if (fRTCPInstance != NULL) { fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort, rtcpRRHandler, rtcpRRHandlerClientData); } } }
这个函数就会去调用RTPSink类中的startPlaying()函数,可是RTPSink没有实现,直接调用父类MediaSink中的startPlaying函数。后面就跟组播同样的采集,组包,发送数据了。