Darwin Streaming Server调研总结linux
Darwin streaming server(简称DSS) 的简介
DSS主要几个特性:macos
支持MP四、3GPP等文件格式;编程
支持MPEG-四、H.264等视频编解码格式;浏览器
支持RTSP流控协议,支持HTTP协议;缓存
支持RTP流媒体传输协议;安全
支持单播和组播;服务器
支持基于Web的管理;网络
具备完备的日志功能。数据结构
此外,该服务器版本提供了一个基于模块的扩展方法。利用DSS提供的API就能够很方便地编写静态或动态的模块,对DSS进行扩展,使其支持其它文件格式、协议或者功能。
DSS服务器的编译安装调试
Darwin Streaming Server(简称DSS)是苹果公司的开源视频服务器版本,最新版本6.0.3
获取包:6.0.3版本wget http://dss.macosforge.org/downloads/DarwinStreamingSrvr6.0.3-Source.tar
linux的补丁
wget http://www.abrahamsson.com/dss-6.0.3.patch
wget http://dss.macosforge.org/trac/attachment/ticket/6/dss-hh-20081021-1.patch?format=raw
1. 安装步骤:
解压:
tar –xvf DarwinStreamingSrvr6.0.3-Source.tar
2. 打补丁:
patch -p1 < dss-6.0.3.patch
3. patch -p1 < dss-hh-20081021-1.patch
4. 修改一下Install:
cd DarwinStreamingSrvr6.0.3-Source
vi Install
行255改为 /usr/sbin/useradd -g qtss qtss
5. 编译:
./Buildit install
生成安装目录:
./DSS_MakeRoot -f /tmp/dss
6. 去安装目录&安装:
cd /tmp/dss
./Install安装完之后,一些默认的基本目录
/var/streaming/logs 日志文件目录
/usr/local/movies 影片存放目录和sdp文件存放目录
/usr/local/sbin/DarwinStreamingServer 启动DSS的入口
/etc/streaming/streamingserver.xml 相关的DSS的配置文件,能够配置端口,目录,等信息
调试:
能够在先对DSS的配置文件/etc/streaming/streamingserver.xml中设置日志日志的级别,而后再在DSS安装目录下面使用./DarwinStreamingServer –d –D 等参数的形式进行前端显示调试,详见./DarwinStreamingServer –h
DSS的框架
服务器的做用是充当网络客户和服务器模块的接口,其中网络客户使用RTP和RTSP协议来发送请求和接收响应,而服务器模块则负责处理请求和向客户端发送数据包。核心服务器经过建立四种类型的线程来完成本身的工做,具体以下:
- 服务器本身拥有的主线程(Main Thread)。这个线程负责检查服务器是否须要关闭,记录状态信息,或者打印统计信息。
- 空闲任务线程(Idle Task Thread)。空闲任务线程管理一个周期性的任务队列。该任务队列有两种类型:超时任务和套接口任务。
- 事件线程(Event Thread)。事件线程负责侦听套接口事件,好比收到RTSP请求和RTP数据包,而后把事件传递给任务线程。
- 一个或者多个任务(Task)线程。任务线程从事件线程中接收RTSP和RTP请求,而后把请求传递到恰当的服务器模块进行处理,把数据包发送给客户端。缺省状况下,核心服务器为每个处理器建立一个任务线程。
模块
媒体服务器使用模块来响应各类请求及完成任务。有三种类型的模块:
1. 内容管理模块
内容管理模块负责管理与媒体源相关的RTSP请求和响应,好比一个文件或者一个广播。每一个模块负责解释客户的请求,读取和解析它们的支持文件或者网络源,而且以RTSP和RTP的方式进行响应。在某些状况下,好比流化mp3的模块,使用的则是HTTP。
QTSSFileModule,QTSSReflectorModule,QTSSRelayModule,和QTSSMP3StreamingModule都是内容管理模块。
2. 服务器支持模块
服务器支持模块执行服务器数据的收集和记录功能。服务器模块包括QTSSErrorLogModule, QTSSAccessLogModule,QTSSWebStatsModule,QTSSWebDebugModule, QTSSAdminModule,和QTSSPOSIXFileSystemModule。
3. 访问控制模块
访问控制模块提供鉴权和受权功能,以及操做URL路径提供支持。
访问控制模块包括QTSSAccessModule,QTSSHomeDirectoryModule,QTSSHttpFileModule,和QTSSSpamDefenseModule。
数据
当一个模块须要访问客户请求的RTSP报头时,能够经过QTSS.h这个API头文件中定义的请求对象来访问相应的请求信息。举例来讲,RTSPRequestInterface类实现了API字典元素,这些元素能够经过API来进行访问。名称是以“Interface”结尾的对象,好比RTSPRequestInterface,RTSPSessionInterface,和QTSServerInterface,则用于实现模块的API。
下面是重要的接口类:
- QTSServerInterface — 这是内部数据的存储对象,在API中标识为QTSS_ServerObject。在API中的每个QTSS_ServerAttributes都在基类中声明和实现。
- RTSPSessionInterace — 这是内部数据的存储对象,在API中标识为qtssRTSPSessionObjectType。在API中的每个QTSS_RTSPSessionAttributes都在基类中声明和实现。
- RTPSessionInterface — 这是内部数据的存储对象,在API中标识为QTSS_ClientSessionObject。在API中的每个QTSS_ClientSessionAttributes都在基类中声明和实现。
- RTSPRequestInterface — 这是内部数据的存储对象,在API中标识为QTSS_RTSPRequestObject。在API中的每个QTSS_RTSPRequestAttributes都在基类中声明和实现。
源代码的组织
Server.tproj
这个目录包含核心服务器(core server)的代码,能够分红三个子系统:
- 服务器内核。这个子系统中的类都有一个QTSS前缀。QTSServer负责处理服务器的启动和关闭。QTSServerInterface负责保存服务器全局变量,以及收集服务器的各类统计信息。QTSSPrefs是存储服务器偏好设定的地方。QTSSModule,QTSSModuleInterface,和QTSSCallbacks类的惟一目的就是支持QTSS的模块API。
- RTSP子系统。这些类负责解析和处理RTSP请求,以及实现QTSS模块API的RTSP部分。其中的几个类直接对应QTSS API的一些元素(好比,RTSPRequestInterface类就是对应于QTSS_RTSPRequestObject对象)。每一个RTSP TCP链接都有一个RTSP会话对象与之相对应。RTSPSession对象是一个Task对象,负责处理与RTSP相关的事件。
- RTP子系统。这些类处理媒体数据的发送。RTPSession对象包含与全部RTSP会话ID相关联的数据。每一个RTPSession都是一个Task对象,能够接受核心服务器的调度来进行RTP数据包的发送。RTPStream对象表明一个单独的RTP流,一个RTPSession对象能够和任何数目的RTPStream对象相关联。这两个对象实现了QTSS模块API中的针对RTP的部分。
CommonUtilitiesLib
这个目录含有一个工具箱,包括线程管理,数据结构,网络,和文本解析工具。Darwin流媒体服务器及其相关工具经过这些类对相似或者相同的任务进行抽象,以减小重复代码;这些类的封装简化了较高层次的代码;借助这些类还分离了专用于不一样平台的代码。下面是对目录下的各个类的简短描述:
- OS类。这些类在时间,条件变量,互斥锁,和线程方面提供了专用于不一样平台的代码抽象。这些类包括OS,OSCond,OSMutex,OSThread,和OSFileSource;数据结构则包括OSQueue,OSHashTable,OSHeap,和OSRef。
- 套接口类(Sockets)。这些类为TCP和UDP网络通信方面提供了专用于不一样平台的代码抽象。一般状况下,套接口类是异步的(或者说是非阻塞的),能够发送事件给Task对象。这些类有:EventContext,Socket,UDPSocket,UDPDemuxer,UDPSocketPool,TCPSocket,和TCPListenerSocket。
- 解析工具。这些类负责解析和格式化文本。包括StringParser,StringFormatter,StrPtrLen,和StringTranslator。
- Task(任务):这些类实现了服务器的异步事件机制。
QTFileLib
流媒体服务器的一个主要特性就是它可以将索引完成(hinted)的QuickTime电影文件经过RTSP和RTP协议提供给客户。这个目录包含QTFile库的源代码,包括负责解析索引完成的QuickTime文件的代码。服务器的RTPFileModule经过调用QTFile库来从索引过的QuickTime文件中取得数据包和元数据。QTFile库能够解析下面几种文件类型:.mov,.mp4(.mov的一种修改版本),和.3gpp(.mov的一种修改版本)。
APICommonCode
这个目录包含与API相关的类的源代码,好比moduletils,或者诸如记录文件的管理这样的公共模块函数。
APIModules
这个目录包含流媒体服务器模块目录,每一个模块都有一个目录。
RTSPClientLib
这个目录包含实现RTSP客户端的源代码,这些代码能够用于链接服务器,只要该链接协议被支持。
RTCPUtilitiesLib
这个目录包含解析RTCP请求的源代码。
APIStubLib
这个目录包含API的定义和支持文件。
HTTPUtilitiesLib
这个目录包含解析HTTP请求的源代码。
二次开发模块添加的要求
每一个DSS模块必须实现两个函数:一个是Main函数,服务器在启动时将调用这个函数进行必要的初始化。另外一个是Dispatch函数,经过实现此函数,服务器可调用DSS模块并完成特定处理。对于编译到服务器里面的模块,其主函数的地址必须传递到服务器的模块Main函数中。
具体实现时,Main函数必须命名为MyModule_Main,其中MyModule是模块的文件名。此函数的实现一般以下所示:
QTSS_Error MyModule_Main(void* inPrivateArgs)
{
return _stublibrary_main(inPrivateArgs, MyModuleDispatch);}
每一个DSS模块都必须提供一个Dispatch函数。服务器为了特定的目的须要使用某个模块时,是经过调用该模块的Dispatch函数来实现的,调用时必须将任务的名称及相应的参数传递给该函数。在DSS中,使用角色(Role)这个术语来描述特定的任务。Dispatch函数的格式以下所示:
void MyModuleDispatch(QTSS_Role inRole,QTSS_RoleParamPtr inParams);
其中MyModuleDispatch是Dispatch函数的名称;MyModule是模块的文件名;inRole是角色的名称,只有注册了该角色的模块才会被调用;inParams则是一个结构体,可用于传递相应的参数。
DSS对TS流的支持
对DSS进行扩展,以实现对TS流的支持,主要涉及三个方面的问题:
首先,RTSP协议须要支持TS over DVB-C;
其次,可以经过UDP协议直接发送TS流;
最后,PTCP的实现,发送的速率须要依据PCR[1](Program ClockReference,即节目时钟参考)实现适当的调节。下面针对这三个方面问题的解决进行简要的说明:
为了让RTSP协议能支持TS传输,须要对标准的RTSP协议作扩展,即在SETUP阶段,终端告诉服务器须要TS传输,服务器会为该终端分配传输资源,并告诉终端相应的参数(包括频点和节目号等)。 当使用扩展后的RTSP协议实现一次TS流点播时,与一般的RTSP交互过程相比,在SETUP阶段有所不一样。
为了实现TS流经过Cable下发,关键点是视频服务器可以采用UDP协议将TS流依特定速率发送到播放设备。
采用UDP协议把TS包发送,实现相对比较简单,假定TS包的大小是188字节的,只要遵守一个UDP包不该大于以太网最大传输单元的原则,将7个TS包打包成一个UDP包,发送给播放器设备便可实现。
而依特定的速率发送则要求服务器在发送TS流时,必须保证发送数据的速率与媒体正常播放的速率一致性。考虑到终端会有一个缓冲区来平滑发送数据时可能产生的波动,所以对于发送速率与正常播放速率的一致性的要求并非绝对的。但发送数据带来的波动要在播放设备许可的范围内,不然没法正常播放。
Apple公司Darwin流式服务器源代码分析
当前,伴随着Internet的飞速发展,计算机网络已经进入到每个普通人的家庭。在这个过程当中,一个值得咱们关注的现象是:Internet中存储和传输内容的构成已经发生了本质的改变,从传统的基于文本或少许图像的主页变为大容量、富信息量的流式媒体信息。一份早在1998年提交的研究报告就曾指出,流式媒体统治Internet的潮流是不可抗拒的,该报告估计到2003年,存储在网络服务器上的内容超过50%的将是流式媒体信息。但今天看来,这个估计仍是有些保守了。所谓的流式媒体简单的讲就是指人们经过网络实时的收看多媒体信息:如音频流、视频流等。与流式媒体对应的传统工做方式是下载+播放模式,即用户首先下载多媒体文件,而后再在本地播放,这种方法的一个主要缺点是启动延迟较大,例如一个30分钟长的MPEG-I文件(至关于VCD质量),即便使用1.5Mbps的速率下载,也须要半个小时才能完成,这样一个漫长的等待时间实在是没法忍受。在窄带网络环境中,几乎全部基于Internet的流式媒体产品都有着相似的工做原理:首先须要开发高效的压缩编码技术,并经过一套完整有效的传输体系将其发布到用户的桌面上。目前在流式媒体领域,有三种占有主导地位的产品,它们分别是Apple公司的Quick Time、Microsoft公司的Media Server以及Real公司的Real System。本文将介绍QuickTime技术及其开放源代码的Darwin流化服务器。1 QuickTime技术介绍Apple公司近日发布了QuickTime 5及QuickTime Streaming Server 3(简称QTSS)。做为客户端的QuickTime 5是用于在Internet上对高质量音频和视频内容进行建立、播放及提供数字流的软件,目前QuickTime在全世界的使用量已经超过1亿5千万份。QuickTime Streaming Server 3是Apple基于标准的、开放式源代码的流式服务器软件的新版本,它包括如下新功能:跳读保护(Skip Protection),一项得到专利的特性组合,它能够保证Internet上数字流的质量,防止中断;全新的易于使用、基于Web的界面,用户能够在本地或远程进行管理,实现服务器配置。做为Internet流媒体联盟(ISMA)的建立者之一,Apple不断致力于开⒎弦到绫曜嫉牟泛图际酰ü岣呋ゲ僮餍岳从呕没У氖褂锰逖椋壳癚uickTime已被国际标准组织(ISO)选为MPEG-4的基本文件格式,可预见Apple将有更多MPEG-4 产品和技术的推出。QuickTime正迅速成为世界领先的跨平台多媒体技术,并且是迄今为止惟一的开放式源代码、基于标准的数字流解决方案。ZDNet在2000年9月对于三种流式媒体服务器的特征比较说明了QTSS不只仅被技术开发者关注,并且能够经过简单的定制成为成熟强大的产品,评测结果可见表1。表1 ZDNet对三类产品的评测结果服务器模块 QTSS 2.01 Media Server 7 RealServer Basic 7操做系统支持 Windows NT, 2000; FreeBSD; Linux; Mac OS; Solaris Windows NT, 2000 Windows NT, 2000并发流个数 2,000 2,000 25 free/3000 pro现场直播和广播 Yes Yes Yes在线广告支持 Yes Yes YesPPV/流加密 No / No Yes / Yes Yes / Yes分配流能力 No Yes YesSMIL标准支持 Yes No YesRTSP标准支持 Yes No Yes多播支持 Yes Yes Yes状态报告 Yes Yes Yes服务器日志 Yes Yes Yes防火墙和代理支持 Yes Yes Yes远程监控 Yes Yes Yes客户可使用QuickTime Player或其余支持QuickTime的应用程序在Windows或Macintosh平台上接收视频流,并且QuickTime Player能够从苹果公司的网站上下载无偿使用。若是安装了QuickTime的插件,客户还能够直接经过浏览器收看。客户但愿点播一个节目时,QuickTime Player或插件将向QTSS发送请求,指明要点播的节目名。若是该节目存在,QTSS将向客户发送相应的视频流。当客户但愿收看现场直播(或实时广播)时,它首先从QTSS得到关于当前频道的编码格式、地址等相关信息,而后再接受该频道的媒体流。对于那些但愿在Internet上实时流化视频或音频信息的用户,QTSS服务器将是一个很好的选择,经过它可实现多项任务,例如:建立一个24小时在线的Internet广播电台;现场实况转播:如公司会议、体育节目等;建立远程学习站点:如可以点播视频和演讲; 图1是一个利用QTSS服务器创建的现场直播场景。2 Darwin流化服务器介绍Darwin Streaming Server(简称DSS)是QuickTime Streaming Server开放式源代码的版本,同时支持FreeBSD、Linux、Solaris、Windows NT和Windows 2000等多个操做系统,是当前全部同类产品中支持平台最多的一个。DSS的源代码和相关文档可从如下站点得到:http://www.apple.comDSS源代码彻底采用标准C++语言写成,编程风格很是优秀,每一个C++类都对应着一对和类同名的.h/.cpp文件。可是因为大量采用了面向对象的概念,如继承、多态等等;并且源文件和类至关多,因此不太容易讲清楚。所以,读者最好事先把代码完整的过滤一两遍,再配合本文,就能看得更清楚点。整个服务器包括多个子系统,分别存放在独立的工程内,如图2所示。其中,最为重要的是基础功能类库(CommonUtilitiesLib)和流化服务器(StreamingServer)两个工程,前者是整个系统的通用代码工具箱,包括了线程管理、数据结构、网络和文本分析等多个功能模块。DSS和其余相关的工具使用基础功能类库工程中定义的功能类实现如下三个目标:(1)抽象出系统中相同或相似的功能,用于下降代码的冗余度;(2)封装基本功能,简化高层编码的复杂度;(3)隔离开操做系统平台相关的代码。而流化服务器工程中包含了DSS对多个国际标准的实现,是整个服务器的主工程。在本文中,咱们将重点分析这两个工程中的核心代码和模块。另外,咱们还将简单介绍利用DSS提供的开发接口(Module)扩展和定制服务器的方法。DSS实现了四种IETF制定的国际标准,分别是:实时流传输协议RTSP(Real-time Streaming Protocol, RFC 2326)、实时传输协议(RTP Real-time Transfer Protocol,RFC 1889)、实时传输控制协议RTCP(Real-time Transport Control Protocol,RFC 1889)、会话描述协议SDP(Session Description Protocol,RFC 2327)。这四个标准是开发全部流式媒体产品都必须掌握的,所以在对相关代码进行分析和二次开发以前,但愿读者了解上述四种协议的基本思想,上述协议样本可从如下网站得到:http://www.ietf.org3 基础功能类库(Common Utilities)3.1 OS类Darwin Streaming Server支持包括Windows,Linux以及Solaris在内的多种操做系统平台。咱们知道,Windows和Unix(或Unix-like)操做系统之间不管从内核仍是编程接口上都有着本质的区别,即便是Linux和Solaris,在编程接口上也大为不一样。为此,DSS开发了多个用于处理时间、临界区、信号量、事件、互斥量和线程等操做系统相关的类,这些类为上层提供了统一的使用接口,但在内部却须要针对不一样的操做系统采用不一样的方法实现。表2罗列出了DSS中的主要OS类和数据结构。表2 DSS中的主要OS类和数据结构类(数据结构)名 主要功能OS 平台相关的功能类,如内存分配、时间等OSCond 状态变量的基本功能和操做OSMutex 互斥量的基本功能和操做OSThread 线程类OSFileSource 简单文件类OSQueue 队列类OSHashTable 哈希表类OSHeap 堆类OSRef 参考引用类3.1.1 OSMutex/OSCond Class在有多个线程并发运行的环境中,能同步不一样线程的活动是很重要的,DSS开发了OSMutex和OSCond两个类用以封装不一样操做系统对线程同步支持的差别。咱们首先分析OSMutex类,这个类定义了广义互斥量的基本操做,类定义以下:class OSMutex{1 public:2 OSMutex(); //构造函数3 ~OSMutex(); //析构函数4 inline void Lock(); //加锁5 inline void Unlock(); //解锁6 inline Bool16 TryLock(); //异步锁,不管是否成功当即返回7 private:8 #ifdef __Win32__9 CRITICAL_SECTION fMutex; //临界区10 DWORD fHolder; //拥有临界区的线程id11 UInt32 fHolderCount; //进入临界区线程数 //其余略…}在Windows平台上,OSMutex类是经过临界区(CRITICAL_SECTION)来实现的,第10行定义了临界区变量fMutex。类实例化时构造函数调用InitializeCriticalSection(&fMutex)初始化临界区变量,对应的在析构函数中调用DeleteCriticalSection(&fMutex)清除。Lock()函数用于对互斥量加锁,它调用私有方法RecursiveLock实现:void OSMutex::RecursiveLock(){ // 当前线程已经拥有互斥量,只需增长引用计数1 if (OSThread::GetCurrentThreadID() == fHolder)2 {3 fHolderCount++; //增长引用计数4 return;5 }6 #ifdef __Win32__7 ::EnterCriticalSection(&fMutex); //申请进入临界区8 #else9 (void)pthread_mutex_lock(&fMutex);10 #endif11 Assert(fHolder == 0);12 fHolder = OSThread::GetCurrentThreadID(); //更新临界区拥有者标志13 fHolderCount++; 14 Assert(fHolderCount == 1);} 第1行检测若是当前线程已经拥有互斥量,就只需将内部计数fHolderCount加1,以便纪录正在使用互斥量的方法数。若是当前线程尚未获得互斥量,第7行调用EnterCriticalSection()函数申请进入临界区;若是当前已经有其余线程进入临界区,该函数就会阻塞,使得当前线程进入睡眠状态,直到占用临界区的线程调用LeaveCriticalSection(&fMutex)离开临界区后才可能被唤醒。一旦线程进入临界区后,它将首先更新临界区持有者标志(第12行),同时将临界区引用计数加1。 注意到另一个函数TryLock(),该函数也是用于为互斥量加锁,但与Lock()不一样的是,TryLock()函数为用户提供了异步调用互斥量的功能,这是由于它调用::TryEnterCriticalSection(&fMutex)函数申请进入缓冲区:若是临界区没有被任何线程拥有,该函数将临界区的访问区给予调用的线程,并返回TRUE,不然它将马上返回FALSE。TryEnterCriticalSection()和EnterCriticalSection()函数的本质区别在于前者从不挂起线程。接着分析OSCond类,该类定义了状态变量(Condition Variable)的基本操做,类定义以下:class OSCond {1 public:2 OSCond(); //构造函数3 ~OSCond(); //析构函数 4 inline void Signal(); //传信函数5 inline void Wait(OSMutex* inMutex, SInt32 inTimeoutInMilSecs = 0); //等待传信函数6 inline void Broadcast(); //广播传信函数7 private:8 #ifdef __Win32__9 HANDLE fCondition; //事件句柄10 UInt32 fWaitCount; //等待传信用户数//其余略… } 虽然同是用于线程同步,但OSCond类与OSMutex大不相同,后者用来控制对关键数据的访问,而前者则经过发信号表示某一操做已经完成。在Windows平台中,OSCond是经过事件(event)来实现的;构造函数调用CreateEvent()函数初始化事件句柄fCondition,而析构函数则调用CloseHandle()关闭句柄。 OSCond的使用流程是这样的:线程调用Wait(OSMutex* inMutex, SInt32 inTimeoutInMilSecs = 0)函数等待某个事件的发生,其中inTimeoutInMilSecs是最长等待时间,0表明无限长。Wait()函数内部调用了WaitForSingleObject (fCondition, theTimeout)函数,该函数告诉系统线程在等待由事件句柄fCondition标识的内核对象变为有信号,参数theTimeout告诉系统线程最长愿意等待多少毫秒。若是指定的内核对象在规定时间内没有变为有信号,系统就会唤醒该线程,让它继续执行。而函数Signal()正是用来使事件句柄fCondition有信号的。Signal()函数内部实现很简单,只是简单调用SetEvent函数将事件句柄设置为有信号状态。 使用OSCond的过程当中存在一种需求,就是但愿通知全部正在等待的用户事件已经完成,而Signal()函数每次只能通知一个用户,所以又开发了另一个广播传信函数以下:inline void OSCond::Broadcast(){ //提示:本函数至关循环调用Signal()函数1 #ifdef __Win32__2 UInt32 waitCount = fWaitCount; //等待传信的用户数3 for (UInt32 x = 0; x ; x++) //循环为每一个用户传信4 {5 BOOL theErr = ::SetEvent(fCondition); //设置事件句柄为有信号状态6 Assert(theErr == TRUE);7 }//此处略… } Broadcast首先统计全部等待传信的用户数(第2行),而后用一个循环为每一个用户传信(第3~7)行。这种编程方法虽然不是很优雅(elegant),可是因为Windows平台上不支持广播传信功能(Linux和Solaris均支持),也只好如此。3.1.2 OSThread ClassOSThread是DSS中最重要的类之一,它封装而且定义了使用线程的方式,所以须要重点讨论。OSThread类的定义以下:class OSThread{1 public: // 必须在使用其余OSThread函数前调用该初始化函数2 static void Initialize(); 3 OSThread(); //构造函数4 virtual ~OSThread(); //析构函数 //子类继承该纯虚函数完成本身的工做5 virtual void Entry() = 0; 6 void Start(); //启动线程7 void Join(); //等待线程运行完成后删除8 void Detach(); //使线程处于fDetached状态9 static void ThreadYield(); //Windows平台不用10 static void Sleep(UInt32 inMsec); //让线程睡眠 …11 private: //标识线程的状态12 Bool16 fStopRequested:1;13 Bool16 fRunning:1;14 Bool16 fCancelThrown:1;15 Bool16 fDetached:1;16 Bool16 fJoined:1; …17 static void CallEntry(OSThread* thread);//调用子类重载的虚函数18 #ifdef __Win32__//使用_beginghreadex建立线程时的标准入口函数19 static unsigned int WINAPI _Entry(LPVOID inThread); 20 #else21 static void* _Entry(void* inThread); //unix下的入口函数22 #endif}OSThread封装了线程的基本功能,一个OSThread的实例表明一个线程。用户经过继承OSThread,而且重载其中的纯虚函数Entry(第5行),从而将本身的任务交给该线程运行。OSThread内部运行机制比较复杂,为此咱们用图3所示的流程来描述其运行过程。 另外,OSThread对于线程的状态定义了一套完整的控制方法。当用户调用start()函数后,按照上图,最终将调用CallEntry()函数,而该函数在调用Entry()以前将线程设定为运行状态(thread->fRunning = true),当Entry()函数运行完后再设为非运行状态;在运行过程当中,用户能够经过StopAndWaitForThread()、join()、Detach()以及ThrowStopRequest()等函数改变线程其余状态变量。3.1.3 OSHashTable/OSQueue/OSHeap/OSRef ClassDSS定义了几个通用的较为复杂的数据结构,它们都以类的方式封装。这些数据结构不但贯穿于DSS的全部源代码,并且因为其封装的十分好,读者能够在看懂源代码的基础上很容易的将它们从DSS的工程中抽取出来,构建本身的基础类库,为未来的开发工做打下良好的基础。另外,对这些基础数据结构源代码的研究将提升咱们对于面向对象技术的掌握和领会。 最主要的数据结构有四种:哈希表(OSHashTable)、队列(OSQueue)、堆(OSHeap)和对象引用表(OSRef)。前三种是咱们在编程中大量使用的数据结构,而对象引用表则是相似于COM/DCOM组件编程中IUNKOWN接口功能的数据结构,它首先为每一个对象创建了一个字符串形式的ID,以便于经过这个ID找到对象(相似于QueryInterface);另外OSRef类还为每一个对象实例创建了引用计数,只有一个对象再也不被任何人引用,才可能被释放(相似于AddRef和Release)。 鉴于这几个类在结构上有类似之处,下面咱们将分析OSHashTable的源代码,以便可以帮助读者更好的理解其余几个类。OSHashTable的代码以下: template class OSHashTable { /*提示:OSHashTable被设计成为一个类模版,两个输入参数分别为:class T:实际的对象类;class K:用于为class T计算哈希表键值的功能类。*/1 public:2 OSHashTable( UInt32 size ) //构造函数,入参是哈希表中对象的最大个数3 {4 fHashTable = new ( T*[size] ); //申请分配size个哈希对象class T的空间5 Assert( fHashTable );6 memset( fHashTable, 0, sizeof(T*) * size ); //初始化7 fSize = size;/*下面的代码决定用哪一种方式为哈希表的键值计算索引;若是哈希表的大小不是2的幂,只好采用对fSize求余的方法;不然能够直接用掩码的方式,这种方式相对速度更快*/8 fMask = fSize - 1;9 if ((fMask & fSize) != 0) //fSize不是2的幂10 fMask = 0;11 fNumEntries = 0; //当前对象数12 }13 ~OSHashTable() //析构函数14 {15 delete [] fHashTable; //释放空间16 }//下面介绍向哈希表中添加一个class T对象的源代码17 void Add( T* entry ) {18 Assert( entry->fNextHashEntry == NULL ); /*利用功能类class K,计算class T对象的哈希键值,其计算方法由用户在class K中定义*/ 19 K key( entry ); 20 UInt32 theIndex = ComputeIndex( key.GetHashKey() );//利用键值计算索引21 entry->fNextHashEntry = fHashTable[ theIndex ]; //在新加对象中存储索引值22 fHashTable[ theIndex ] = entry; //将该对象插入到索引指定的位置23 fNumEntries++; /24 }//下面介绍从哈希表中删除一个class T对象的源代码25 void Remove( T* entry )26 {//首先从哈希表中找到待删除的对象//一、计算哈希键值和其对应的对象索引27 key( entry ); 28 UInt32 theIndex = ComputeIndex( key.GetHashKey() ); 29 T* elem = fHashTable[ theIndex ];30 T* last = NULL;/*二、经过对象索引查找对象,若是不是要找的对象,接着找下一个,直到找到为止。这是由于,存放的时候就是按照这种模式计算索引的。*/31 while (elem && elem != entry) { 32 last = elem;33 elem = elem->fNextHashEntry;34 } //找到该对象,将其删除35 if ( elem ) 36 {37 Assert(elem);38 if (last) 39 last->fNextHashEntry = elem->fNextHashEntry;40 else //elem在头部41 fHashTable[ theIndex ] = elem->fNextHashEntry;42 elem->fNextHashEntry = NULL;43 fNumEntries--;44 }45 }//下面介绍从哈希表中查找一个class T对象的方法46 T* Map( K* key ) //入参为哈希键值47 {48 UInt32 theIndex = ComputeIndex( key->GetHashKey() ); //计算索引49 T* elem = fHashTable[ theIndex ]; //找到索引对应的对象50 while (elem) {51 K elemKey( elem );52 if (elemKey =*key) //检查是否找对53 break;54 elem = elem->fNextHashEntry; //若是不是,继续找下一个55 }56 return elem;57 }//如下略…} 以上介绍了哈希表的构造以及三种基本操做:添加、删除和查询。另外,DSS还定义了OSHashTableIter类用于枚举OSHashTable中的class T对象;其中主要的操做有First和Next等,限于篇幅,此处就再也不详述。3.2 Tasks类由于服务器从总体上采用了异步的运行模式,这就须要一种用于事件通讯的机制。举例来讲:一个RTSP链接对应的Socket端口监测到网络上有数据到达,此时必须有一个模块(或代码)被通知(notify)去处理这些数据。为此,DSS定义了Task及其相关类做为实现这一通讯机制的核心。在Task.h/cpp文件中,定义了三个主要的类,分别是:任务线程池类(TaskThreadPool Class)、任务线程类(TaskThread Class)以及任务类(Task Class)。每一个Task对象有两个主要的方法:Signal和Run。当服务器但愿发送一个事件给某个Task对象时,就会调用Signal()方法;而Run()方法是在Task对象得到处理该事件的时间片后运行的,服务器中的大部分工做都是在不一样Task对象的Run()函数中进行的。每一个Task对象的目标就是利用很小的且不会阻塞的时间片完成服务器指定某个工做。任务线程类是上文介绍的OSThread类的一个子类,表明专门用于运行任务类的一个线程。在每一个任务线程对象内部都有一个OSQueue_Blocking类型的任务队列,存储该线程须要执行的任务。后面的分析能够看到,服务器调用一个任务的Signal函数,实际上就是将该任务加入到某个任务线程类的任务队列中去。另外,为了统一管理这些任务线程,DSS还开发了任务线程池类,该类负责生成、删除以及维护内部的任务线程列表。图4描述了任务类的运行。 下面咱们首先分析TashThread类,该类的定义以下:class TaskThread : public OSThread //OSThread的子类{ //提示:全部的Task对象都将在TaskThread中运行 1 public: 2 TaskThread() : OSThread(), fTaskThreadPoolElem(this){} //构造函数3 virtual ~TaskThread() { this->StopAndWaitForThread(); } //析构函数 4 private: … 5 virtual void Entry(); //从OSThread重载的执行函数,仍然可以被子类重载 6 Task* WaitForTask(); //检测是否有该执行的任务 7 OSQueueElem fTaskThreadPoolElem; //对应的线程池对象 8 OSHeap fHeap; //纪录任务运行时间的堆,用于WaitForTask函数 /*关键数据结构:任务队列;在Task的Signal函数中直接调用fTaskQueue对象的EnQueue函数将本身加入任务队列*/ 9 OSQueue_Blocking fTaskQueue; //此处略… } 做为OSThread的子类,TaskThread重载了Entry函数,一旦TaskThread的对象被实例化,便运行该函数。Entry()函数的主要任务就是调用WaitForTask()函数监测任务队列,若是发现新任务,就在规定时间内执行;不然,就被阻塞。下面咱们简要分析Entry()函数的流程: void TaskThread::Entry(){ 1 Task* theTask = NULL; //空任务 2 while (true) //线程循环执行 3 { //监测是否有须要执行的任务,若是有就返回该任务;不然阻塞; 4 theTask = this->WaitForTask(); 5 Assert(theTask != NULL); 6 Bool16 doneProcessingEvent = false; //还没有处理事件 7 while (!doneProcessingEvent) 8 { 9 theTask->fUseThisThread = NULL; // 对任务的调度独立于线程 10 SInt64 theTimeout = 0; //Task中Run函数的返回值,重要 //核心部分:运行任务,根据返回值判断任务进度 11 if (theTask->fWriteLock) 12 { //若是任务中有写锁,须要使用写互斥量,不然可能形成死锁 13 OSMutexWriteLocker mutexLocker(&TaskThreadPool::sMutexRW); 14 theTimeout = theTask->Run(); //运行任务,获得返回值 15 theTask->fWriteLock = false; 16 } 17 else 18 { //使用读互斥量 19 OSMutexReadLocker mutexLocker(&TaskThreadPool::sMutexRW); 20 theTimeout = theTask->Run(); //运行任务,获得返回值 21 } 22 //监测Task中Run()函数的返回值,共有三种状况 23 //一、返回负数,代表任务已经彻底结束 24 if (theTimeout 25 { 26 delete theTask; //删除Task对象 27 theTask = NULL; 28 doneProcessingEvent = true; 19 } 30 //二、返回0,代表任务但愿在下次传信时被再次当即执行 31 else if (theTimeout=0) 32 { 33 doneProcessingEvent = compare_and_store(Task::kAlive, 0, &theTask->fEvents); 34 if (doneProcessingEvent) 35 theTask = NULL; 36 } //三、返回正数,代表任务但愿在等待theTimeout时间后再次执行 37 else 38 { /*将该任务加入到Heap中,而且纪录它但愿等待的时间。Entry()函数将经过waitfortask()函数进行检测,若是等待的时间到了,就再次运行该任务*/ 39 theTask->fTimerHeapElem.SetValue(OS::Milliseconds() + theTimeout); 40 fHeap.Insert(&theTask->fTimerHeapElem); 41 (void)atomic_or(&theTask->fEvents, Task::kIdleEvent);//设置Idle事件 42 doneProcessingEvent = true; 43 } //此处略… } 注意,若是Task的Run()函数返回值TimeOut为正数,意味着该任务是一个周期性的工做,例如发送数据的视频泵(pump),须要每隔必定时间就发出必定量的视频数据,直至整个节目结束。为此,在第38~43行,将该任务加入到堆fHeap中去,而且标记该任务下次运行的时间为TimeOut毫秒以后。未来经过调用WaitForTask()函数就能检测到该任务是否到达规定的运行时间,WaitForTask()函数的代码以下: Task* TaskThread::WaitForTask(){ 1 while (true) 2 { //获得当前时间,该函数为静态函数,定义见OS.h 3 SInt64 theCurrentTime = OS::Milliseconds(); /*若是堆中有任务,且任务已经到执行时间,返回该任务。 PeekMin函数见OSHeap.h,窃听堆中第一个元素(但不取出)*/4 if ((fHeap.PeekMin() != NULL) && (fHeap.PeekMin()->GetValue() 从堆中取出第一个任务返回5 return (Task*)fHeap.ExtractMin()->GetEnclosingObject(); //若是堆中有任务,可是还没有到执行时间,计算须要等待的时间 6 SInt32 theTimeout = 0; 7 if (fHeap.PeekMin() != NULL) //计算还需等待的时间 8 theTimeout = fHeap.PeekMin()->GetValue() - theCurrentTime; 9 Assert(theTimeout >= 0); //等待theTimeout时间后从堆中取出任务返回 10 OSQueueElem* theElem = fTaskQueue.DeQueueBlocking(this, theTimeout); 11 if (theElem != NULL) 12 return (Task*)theElem->GetEnclosingObject(); 13 } } 上文曾经提到,Task对象内有两个方法:Signal和Run。Run函数是一个虚函数,由Task的子类重载,它的用法咱们在分析TaskThread的Entry()函数和WaitForTask()函数中已经讨论了。而另外一个Signal()函数也十分重要:服务器经过调用该函数将Task加入TaskThread,而且执行Run()函数。Signal()函数的核心部分以下: void Task::Signal(EventFlags events){ … // fUseThisThread用于指定该任务运行的任务线程 1 if (fUseThisThread != NULL) //存在指定任务线程 //将该任务加入到指定任务线程的任务队列中 2 fUseThisThread->fTaskQueue.EnQueue(&fTaskQueueElem); //不存在指定的任务线程,随机选择一个任务线程运行该任务3 else 4 { //从线程池中随机选择一个任务线程 5 unsigned int theThread = atomic_add(&sThreadPicker, 1); 6 theThread %= TaskThreadPool::sNumTaskThreads; //将该任务加入到上面选择的任务线程的任务队列中 7 TaskThreadPool::sTaskThreadArray[theThread]-> fTaskQueue.EnQueue (&fTaskQueueElem); 8 } } 至此咱们已经将DSS的线程和任务运行机制分析完了,这种由事件去触发任务的概念已经被集成到了DSS的各个子系统中。例如,在DSS中常常将一个Task对象和一个Socket对象关联在一块儿,当Socket对象收到事件(经过select()函数),相对应的Task对象就会被传信(经过Signal()函数);而包含着处理代码的Run()函数就将在某个任务线程中运行。 所以,经过使用这些Task对象,咱们就可让全部链接都使用一个线程来处理,这也是DSS的缺省配置方法。3.3 Socket类做为一个典型的网络服务器,DSS源代码中的Socket编程部分是其精华之一。DSS定义了一系列Socket类用于屏蔽不一样平台在TCP/UDP编程接口和使用方法上的差别。DSS中的Socket类通常都采用异步模式的(即非阻塞的),并且可以向对应的Task对象传信(Signal),这点咱们在上一节介绍过。Socket类中具备表明性的类是:EventContext、EventThread、Socket、UDPSocket、TCPSocket以及TCPListenerSocket等等,它们之间的继承关系见图5。 在eventcontext.h/.cpp文件中,定义了两个类:EventContext类和EventThread类。 Event Context提供了检测Unix式的文件描述符(Socket就是一种文件描述符)产生的事件(一般是EV_RE 或 EV_WR)的能力,同时还能够传信指定的任务。EventThread类是OSThread类的子类,它自己很简单,只是重载了OSThread的纯虚函数Entry(),用以监控全部的Socket端口是否有数据到来,其代码分析以下: void EventThread::Entry(){/*该结构定义在ev.h中,记录Socket描述符和在该描述符上发生的事件*/ 1 struct eventreq theCurrentEvent; 2 ::memset( &theCurrentEvent, '\0', sizeof(theCurrentEvent) ); //初始化该结构 3 while (true)4 {//首先监听Socket端口的事件 5 int theErrno = EINTR; 6 while (theErrno=EINTR) 7 {8 #if MACOSXEVENTQUEUE //Macos平台 9 int theReturnValue = waitevent(&theCurrentEvent, NULL);10 #else //其余平台 /*调用select_waitevent函数监听全部的Socket端口,直到有事件发生为止*/ 11 int theReturnValue = select_waitevent(&theCurrentEvent, NULL);12 #endif … //有事件发生,唤醒相应的Socket端口13 if (theCurrentEvent.er_data != NULL) 14 { //经过事件中的标识找到相应的对象参考指针 15 StrPtrLen idStr((char*)&theCurrentEvent.er_data, sizeof(theCurrentEvent.er_data)); 16 OSRef* ref = fRefTable.Resolve(&idStr); 17 if (ref != NULL) 18 { //经过参考指针获得EventContext对象 19 EventContext* theContext = (EventContext*)ref->GetObject(); //利用EventContext对象的ProcessEvent方法传信对应的Task 20 theContext->ProcessEvent(theCurrentEvent.er_eventbits); 21 fRefTable.Release(ref); //减小引用计数 22 }//此处略…} 上述代码有两点须要注意:首先在第11行,调用select_waitevent函数监听全部Socket端口的事件。该函数在Windows平台上是采用WSAAsyncSelect(异步选择)模型实现的。具体实现是:系统首先建立一个窗口类,该类专门用于接受消息;在每一个Socket端口建立后,调用WSAsyncSelect函数,同时将上述窗口类的句柄做为参数传入;未来这些Socket端口有事件发生时,Windows就会自动将这些事件映射为标准的Windows消息发送给窗口类,此时select_waitevent函数经过检查消息就可以得到对应Socket端口发生的事件。对于Windows平台下Socket的异步编程技术细节请参阅《Windows网络编程技术》一书。 另外,在第20行调用的EventContext对象的ProcessEvent函数实现上很简单,只有一行代码:fTask->Signal(Task::kReadEvent);其中fTask为该EventContext对象对应的Task对象;ProcessEvent函数向Task对象传信,以便及时处理刚刚发生的Socket事件。 与EventThread对应的EventContext对象负责维护指定的描述符,其主要函数包括InitNonBlocking、CleanUp和RequestEvent等。其中InitNonBlocking函数调用Socket API ioctlsocket将用户指定的描述符设置为异步,CleanUp函数用于关闭该描述符;另外,用户经过RequestEvent函数申请对该描述符中某些事件的监听,如前所述,该函数内部调用了WSAsyncSelect来实现这一功能。 Socket Class、UDPSocket Class和TCPSocketClass三个类都是EventContext的子类,它们封装了TCP和UDP的部分实现,同时扩展了EventContext中的事件,但都没有改变其运行机制,所以此处再也不详述,留给读者自行分析。咱们要为你们分析的是另一个比较复杂的Socket类TCPListenerSocket类。TCPListenerSocket用于监听TCP端口,当一个新链接请求到达后,该类将赋予这个新链接一个Socket对象和一个Task对象的配对。首先分析TCPListenerSocket类的主要定义以下: class TCPListenerSocket : public TCPSocket, public IdleTask{/*提示:该类从有两个基类,因此它既是一个事件监听者,同时也是一个任务Task。做为一个任务,给TCPListenerObject发送Kill事件就能够删除它*/ 1 public: 2 TCPListenerSocket() : TCPSocket(NULL, Socket::kNonBlockingSocketType), IdleTask(), fAddr(0), fPort(0), fOutOfDescriptors(false) {} //构造函数 3 virtual ~TCPListenerSocket() {} //析构函数 //addr为地址,port为端口号,初始化函数自动监听TCP端口 4 OS_Error Initialize(UInt32 addr, UInt16 port); //子类必须重载该纯虚函数,用于创建新链接时生成任务对象 5 virtual Task* GetSessionTask(TCPSocket** outSocket) = 0; 6 virtual SInt64 Run(); //重载Task的Run函数,子类仍可重载 7 private: //重载EventContext的ProcessEvent函数,用于产生Socket和Task对象配对8 virtual void ProcessEvent(int eventBits); 9 OS_Error Listen(UInt32 queueLength);//其余略…} 前面咱们分析得知,EventContext类经过ProcessEvent函数来实现对任务的传信工做,但在TCPListenerSocket 中,ProcessEvent函数被重载用来建立Socket和Task对象得配对,该函数的实现以下: void TCPListenerSocket::ProcessEvent(int /*eventBits*/){ /*提示:该函数运行于系统惟一的EventThread线程中,因此要尽可能快速,以避免占用过多的系统资源*/ //此处略去部分定义… 1 Task* theTask = NULL; //Task对象 2 TCPSocket* theSocket = NULL; //Socket对象 //建立对象配对 3 while (true) 4 { //accept链接 5 int osSocket = accept(fFileDesc, (struct sockaddr*)&addr, &size); 6 if (osSocket == -1) //监听端口出错 7 { //此处略去出错处理 } //用子类重载的GetSessionTask函数建立Task对象 8 if ((theTask = this->GetSessionTask(&theSocket))=NULL) //建立出错 9 close(osSocket); 10 else //建立成功,接着建立Socket对象 11 { 12 Assert(osSocket != EventContext::kInvalidFileDesc); //此处略去部分对新建链接端口的设置(setsockopt函数) //建立新的Socket对象 13 theSocket->Set(osSocket, &addr); 14 theSocket->InitNonBlocking(osSocket); //初始化 15 theSocket->SetTask(theTask); //设置对应的任务 16 theSocket->RequestEvent(EV_RE); //新对象监听读事件 17 } 18 } //处理完一次链接请求后,TCPListenerSocket对象还要接着监听 19 this->RequestEvent(EV_RE);} 对Socket类的分析基本完成了,从中咱们能够发现,DSS对于网络传信和任务调度之间的处理很是精密,环环相扣,在某种程度上甚至是有些过a于花哨。可是这些基本类是上层RTSP/RTP等服务器子系统编码的基础,所以但愿读者可以从本质上掌握这些代码。4 核心功能库(Server Core)4.1 RTSP 子系统 RTSP标准是实时流控制协议(Real-Time Streaming Protocol RFC2326)的简称,它被客户和流式媒体服务器用来交换对媒体的控制信息。图6是RTSP基本操做的描述。再给出一个RTSP协议的例子以下: DSS开发了一个RTSP子系统来支持标准的RTSP协议,本节将分析这些源代码。 首先,DSS定义了一个TCPListenerSocket类的子类RTSPListenerSocket,用于监听RTSP链接请求。RTSPListenerSocket类作的惟一一件事就是重载了GetSessionTask函数,当客户的链接请求到达后,它建立了一个Socket对象和RTSPSession对象的配对。RTSPSession对象是Task类的子类,是专门用于处理RTSP请求的任务类。 如图7所示,RTSP链接创建后,服务器会为每一个客户维护一个Socket对象和RTSPSession对象的配对;当客户的RTSP请求到达时,Socket对象就会调用RTSPSession对象的Signal方法传信,即将RTSPSession对象加入到TaskThread对象的任务队列中去;而当时间片到来,TaskThread线程就会调用RTSPSession对象的Run方法,这个方法就会处理客户发送过来的RTSP请求。所以,下面咱们将主要分析RTSPSession的Run方法。 为了跟踪当前处理的状况,RTSPSession类内部定义了多个状态,而Run方法其实就是经过在这些状态之间不断切换,同时对客户的RTSP请求作出不一样的处理。 enum { //RTSPSession的基本状态 kReadingRequest= 0, kFilteringRequest= 1, kRoutingRequest= 2, kAuthenticatingRequest= 3, kPreprocessingRequest= 4, kProcessingRequest= 5, kSendingResponse= 6, kPostProcessingRequest = 7, kCleaningUp= 8, //当RTSP协议经过HTTP隧道实现时将用到下面的状态 kWaitingToBindHTTPTunnel = 9, kSocketHasBeenBoundIntoHTTPTunnel = 10,kHTTPFilteringRequest = 11, kReadingFirstRequest = 12, kHaveNonTunnelMessage = 13 } 另外,值得注意的是,DSS提供一种称为Module的二次开发模式,开发人员能够编写新的Module而且注册其但愿运行的状态,系统就会在相应的状态下调用该Module,从而将控制权暂时交给二次开发的代码,以便加强系统的功能。简单起见,下面咱们将分析不存在客户模块的Run()函数源代码。首先分析其主框架以下: SInt64 RTSPSession::Run(){ 1 EventFlags events = this->GetEvents(); //取出事件 2 QTSS_Error err = QTSS_NoErr; 3 QTSSModule* theModule = NULL; 4 UInt32 numModules = 0; // 设定当前的Module状态 5 OSThread::GetCurrent()->SetThreadData(&fModuleState); //检查该链接是否超时,若是是就设定状态断掉该链接 6 if ((events & Task::kTimeoutEvent) || (events & Task::kKillEvent)) 7 fLiveSession = false; 8 while (this->IsLiveSession()) //若是链接还没有拆除,执行状态机9 { /* 提示:下面是RTSPSession的状态机。由于在处理RTSP请求过程当中,有多个地方须要Run方法返回以便继续监听新的事件。为此,咱们须要跟踪当前的运行状态,以便在被打断后还能回到原状态*/ 10 switch (fState) 11 { 12 case 状态1: //处理略13 case 状态2: //处理略…14 case 状态n: //处理略 15 } //此处略… } Run函数的主框架比较简单,其核心就在于10~15的状态机,所以咱们但愿按照客户请求到达而且被处理的主要流程为读者描述该状态机的运转。 1第一次请求到达进入kReadingFirstRequest状态,该状态主要负责从RTSPRequestStream类的对象fInputStream中读出客户的RTSP请求,其处理以下: case kReadingFirstRequest: { 1 if ((err = fInputStream.ReadRequest())=QTSS_NoErr) 2 {/* RequestStream返回QTSS_NoErr意味着全部数据已经从Socket中读出,但尚不能构成一个完整的请求,所以必须等待更多的数据到达*/ 3 fInputSocketP->RequestEvent(EV_RE); //接着请求监听读事件 4 return 0; //Run函数返回,等待下一个事件发生 5 } 6 if ((err != QTSS_RequestArrived) && (err != E2BIG)) 7 {//出错,中止处理 8 Assert(err > 0); 9 Assert(!this->IsLiveSession()); 10 break; 11 } //请求已经彻底到达,转入kHTTPFilteringRequest状态 12 if (err = QTSS_RequestArrived) 13 fState = kHTTPFilteringRequest; //接收缓冲区溢出,转入kHaveNonTunnelMessage状态 14 if (err=E2BIG) 15 fState = kHaveNonTunnelMessage; } continue; 2正常状况下,在得到一个完整的RTSP请求后(上第12行),系统将进入kHTTPFilteringRequest状态该状态检查RTSP链接是否须要通过HTTP代理实现;如不须要,转入kHaveNonTunnelMessage状态。 3进入kHaveNonTunnelMessage状态后,系统建立了RTSPRequest类的对象fRequest,该对象解析客户的RTSP请求,并保存各类属性。fRequest对象被传递给其余状态处理。 4接着进入kFilteringRequest状态,二次开发人员能够经过编写Module对客户的请求作出特殊处理。若是客户的请求为正常的RTSP请求,系统调用SetupRequest函数创建用于管理数据传输的RTPSession类对象,其源代码分析以下: void RTSPSession::SetupRequest(){ // 首先分析RTSP请求,细节见RTSPRequest.h/.cpp 1 QTSS_Error theErr = fRequest->Parse();2 if (theErr != QTSS_NoErr) 3 return; //OPTIONS请求,简单发回标准OPTIONS响应便可4 if (fRequest->GetMethod() = qtssOptionsMethod) 5 {//此处略去部分处理代码…6 } //DESCRIBE请求,必须保证已经有了SessionID 7 if (fRequest->GetMethod() = qtssDescribeMethod) 8 { 9 if (fRequest->GetHeaderDictionary()->GetValue(qtssSessionHeader)->Len > 0) 10 { 11 (void)QTSSModuleUtils::SendErrorResponse(fRequest, qtssClientHeaderFieldNotValid, qtssMsgNoSesIDOnDescribe);12 return; 13 }14 } //查找该请求的RTPSession 15 OSRefTable* theMap = QTSServerInterface::GetServer()->GetRTPSessionMap(); 16 theErr = this->FindRTPSession(theMap); 17 if (theErr != QTSS_NoErr) 18 return; //若是未查找到,创建一个新的RTPSession 19 if (fRTPSession= NULL) 20 { 21 theErr = this->CreateNewRTPSession(theMap); 22 if (theErr != QTSS_NoErr) 23 return; 24 } //此处略…} 5进入kRoutingRequest状态,调用二次开发人员加入的Module,用于将该请求路由(Routing)出去。缺省状况下,系统自己对此状态不作处理。 6进入kAuthenticatingRequest状态,调用二次开发人员加入的安全模块,主要用于客户身份验证以及其余如规则的处理。读者若是但愿开发具备商业用途的流式媒体服务器,该模块必须进行二次开发。 7进入kPreprocessingRequest和kProcessingRequest及kPostProcessingRequest状态,这三种状态都是经过调用系统自带或二次开发人员添加的Module来处理RTSP请求,例如系统提供了QTSSReflector Module、QTSSSplitter Module以及QTSSFile Module等模块。其中比较重要的QTSSFile Module属于QTLib库的部分,此处再也不详述。 8进入kSendingResponse状态,用于发送对客户RTSP请求处理完成以后的响应。系统在该状态调用了fOutputStream.Flush()函数将在fOutputStream中还没有发出的请求响应经过Socket端口彻底发送出去。 9进入kCleaningUp状态,清除全部上次处理的数据,并将状态设置为kReadingRequest等待下次请求到达。 RTSPSession的主流程分析完了,但辅助其操做的多个RTSP类还须要读者自行分析,它们分别是:RTSPSessionInterface Class、RTSPRequest Class、RTSPRequestInterface Class、RTSPRequestStream Class以及RTSPResponseStream Class等等。4.2 RTP子系统 RTP标准是实时传输协议(Real-Time Transfer Protocol)的简称,它被客户和流式媒体服务器用来处理流式媒体数据的传输。在介绍RTSP的运行流程时,咱们发现RTSPSession对象经过调用SetupRequest函数为客户创建RTPSession对象。RTPSession类是Task类的子类,所以它重载了Task类的Run函数,该函数经过调用FileModule.cpp文件中的SendPacket()函数向客户发送RTP协议打包的流式媒体数据。当客户经过利用RTSP向RTSPSession对象发出PLAY命令后,RTSPSession对象将调用RTPSession对象的Play()函数。Play函数准备好须要打包发送的数据后,利用Task类的Signal函数传信RTPSession对象,使其被加入某个TaskThread的任务队列,从而运行其Run函数。另外,对同一个节目中的每个独立的RTP流(如音频流或视频流等),DSS都定义了一个RTPStream类与之对应;显然一个RTPSession对象可能包含多个RTPStream对象。整个RTP子系统的核心运行流程见图8。 下面,咱们首先分析RTPSession中Run()函数的用法: SInt64 RTPSession::Run(){ //提示:该函数代码在TaskThread内运行1 EventFlags events = this->GetEvents(); //取出事件2 QTSS_RoleParams theParams; //提供给其余Module运行的参数,第一个成员是对象自己 3 theParams.clientSessionClosingParams.inClientSession = this; //设定本身为当前运行的线程 4 OSThread::GetCurrent()->SetThreadData(&fModuleState); /*若是事件是通知RTPSession对象死亡,就准备自杀。可能致使这种状况的有两种事件:自杀kKillEvent;超时kTimeoutEvent*/ 5 if ((events & Task::kKillEvent) || (events & Task::kTimeoutEvent) || (fModuleDoingAsyncStuff)) 6 { //处理对象自杀代码,此处略… 7 return –1; //返回出错信息,这样析构函数就会被调用,从而让对象彻底死亡 8 } //若是正处于暂停(PAUSE)状态,什么都不作就返回,等待PLAY命令 9 if ((fState == qtssPausedState) || (fModule == NULL)) 10 return 0; //下面代码负责发送数据 11 { //对Session互斥量加锁,防止发送数据过程当中RTSP请求到来 12 OSMutexLocker locker(&fSessionMutex); //设定数据包发送时间,防止被提早发送 13 theParams.rtpSendPacketsParams.inCurrentTime = OS::Milliseconds(); 14 if (fPlayTime > theParams.rtpSendPacketsParams.inCurrentTime) //未到发送时间 15 theParams.rtpSendPacketsParams.outNextPacketTime=fPlayTime- theParams.rtpSendPacketsParams.inCurrentTime; //计算还需多长时间才可运行 16 else 17 { //下次运行时间的缺缺省值为0 18 theParams.rtpSendPacketsParams.outNextPacketTime = 0; // 设置Module状态 19 fModuleState.eventRequested = false; 20 Assert(fModule != NULL); //调用QTSS_RTPSendPackets_Role内的函数发送数据,见FileModule.cpp 21 (void)fModule->CallDispatch(QTSS_RTPSendPackets_Role, &theParams); //将返回值从负数改成0,不然任务对象就会被TaskThread删除 22 if (theParams.rtpSendPacketsParams.outNextPacketTime 23 theParams.rtpSendPacketsParams.outNextPacketTime = 0; 24 } 25 } //返回下一次但愿被运行的时间;返回值含义见前文的分析 26 return theParams.rtpSendPacketsParams.outNextPacketTime;} 从上面分析可见,正常状态下Run函数的返回值有两种:若是返回值为正数,表明下一次发送数据包的时间,规定时间到来的时候,TaskThread线程会自动调用Run函数;若是返回值等于0,在下次任何事件发生时,Run函数就会被调用,这种状况每每发生在全部数据都已经发送完成或者该RTPSession对象将要被杀死的时候。 在第21行咱们看到,Run函数调用了QTSSFileModule中的QTSS_RTPSendPackets_Role发送数据。在QTSSFileModule.cpp文件的QTSSFileModule_Main函数内,系统又调用了SendPackets函数,这才是真正发送RTP数据包的函数,咱们对其代码分析以下: QTSS_Error SendPackets(QTSS_RTPSendPackets_Params* inParams){ //此处略去部分定义… //获得要发送数据的FileSession对象,其定义见QTSSFileModule.cpp文件 1 FileSession** theFile = NULL; 2 UInt32 theLen = 0; 3 QTSS_Error theErr = QTSS_GetValuePtr(inParams->inClientSession, sFileSessionAttr, 0, (void**)&theFile, &theLen); 4 if ((theErr != QTSS_NoErr) || (theLen != sizeof(FileSession*))) //出错 5 { //设定出错缘由,而后断掉链接,并返回 6 QTSS_CliSesTeardownReason reason = qtssCliSesTearDownServerInternalErr; 7 (void) QTSS_SetValue(inParams->inClientSession, qtssCliTeardownReason, 0, &reason, sizeof(reason)); 8 (void)QTSS_Teardown(inParams->inClientSession); 9 return QTSS_RequestFailed; 10 } //该节目文件中音频所能忍受的最大延迟 11 maxDelayToleranceForStream = (*theFile)->fMaxAudioDelayTolerance; 12 while (true) 13 { //不存在待发送数据包,多是文件还没有打开 14 if ((*theFile)->fNextPacket == NULL) 15 { 16 void* theCookie = NULL; //得到第一个数据包,theTransmitTime为传输数据花费的时间 17 Float64 theTransmitTime = (*theFile)->fFile.GetNextPacket(&(*theFile)->fNextPacket, &(*theFile)->fNextPacketLen, &theCookie); 18 if ( QTRTPFile::errNoError != (*theFile)->fFile.Error() ) {//读数据出错,断掉链接,返回。此处略 } … 19 (*theFile)->fStream = (QTSS_RTPStreamObject)theCookie; //获得RTPStream对象 20 (*theFile)->fPacketPlayTime = (*theFile)->fAdjustedPlayTime + ((SInt64)(theTransmitTime * 1000)); //推迟theTransmitTime长度的播放时间 21 (*theFile)->fPacketWasJustFetched = true; 22 if ((*theFile)->fNextPacket != NULL) 23 { // 判断流格式 24 QTSS_RTPPayloadType* thePayloadType = NULL; 25 QTSS_Error theErr = QTSS_GetValuePtr( (*theFile)->fStream, qtssRTPStrPayloadType, 0, (void**)&thePayloadType, &theLen ); //设定视频流可忍受的最大延迟时间 26 if (*thePayloadType == qtssVideoPayloadType) 27 maxDelayToleranceForStream = (*theFile)->fMaxVideoDelayTolerance; 28 } 29 } //仍无数据,说明全部数据已经传输完成了 30 if ((*theFile)->fNextPacket = NULL) 31 { //向fStream中写入长度为0的空数据,以便强制缓冲区刷新 32 (void)QTSS_Write((*theFile)->fStream, NULL, 0, NULL, qtssWriteFlagsIsRTP); 33 inParams->outNextPacketTime = qtssDontCallSendPacketsAgain; 34 return QTSS_NoErr; //完成任务返回 35 } //提示:开始发送RTP数据包 //计算当前时间和该段数据应该发送的时间之间的相对间隔 36 SInt64 theRelativePacketTime = (*theFile)->fPacketPlayTime - inParams->inCurrentTime; // inCurrentTime = OS::Milliseconds(); 37 SInt32 currentDelay = theRelativePacketTime * -1L; //计算传输延迟 38 theErr = QTSS_SetValue( (*theFile)->fStream, qtssRTPStrCurrentPacketDelay, 0, ¤tDelay, sizeof(currentDelay) ); //保存该延迟 //若是延迟过大,就丢弃该包,等待发送下一个数据包 39 if (theRelativePacketTime > sMaxAdvSendTimeInMsec) 40 { 41 Assert( theRelativePacketTime > 0 ); 42 inParams->outNextPacketTime = theRelativePacketTime; 43 return QTSS_NoErr; 44 } //此处略去部分处理视频质量的代码… // 发送当前数据包 45 QTSS_Error writeErr = QTSS_Write((*theFile)->fStream, (*theFile)->fNextPacket, (*theFile)->fNextPacketLen, NULL, qtssWriteFlagsIsRTP); //其他代码略…} RTP子系统是DSS中最为复杂的部分之一,这是由于发送RTP数据包的过程不但涉及到网络接口,并且和文件系统有着密切的关系。DSS的一个重要特征就是可以将线索化(Hinted)过的QuickTime文件经过RTSP和RTP协议流化出去。全部分析这些文件的代码都被提取出来而且封装在QTFile库中。这种封装方式使得系统的各个部分都变得简单:QTFile负责处理文件的分析;而DSS其余部分负责处理网络和协议。服务器中的RTPFileModule调用QTFile库检索索引过的QuickTime文件的数据包和元数据。QTFile库的讲解超出了本文的范围,可是但愿让DSS支持其余媒体格式的读者可以掌握它的实现机制。5 DSS二次开发接口:Module开发流程 做为一个运行于多个操做系统平台的开发源代码的服务器,DSS提供了一种称为Module的二次开发接口。使用这个开发接口,咱们能够充分利用服务器的可扩展性及其实现的多种协议,而且可以保证和未来版本兼容。DSS中的许多核心功能也是以Module的方式预先实现而且编译的,所以能够说对Module的支持已经被设计到DSS的内核中去了。 下面咱们将分析DSS的一个内嵌Module:QTSSFileModule的源代码来讲明Module的编程方式,QTSSFileModule的实如今QTSSFileModule.cpp文件中。 每一个QTSS Module必须实现两个函数:首先,每一个QTSS Module必须实现一个主函数,服务器调用该函数用于启动和初始化模块中的QTSS函数;QTSSFileModule主函数的实现以下:QTSS_Error QTSSFileModule_Main(void* inPrivateArgs){ return _stublibrary_main(inPrivateArgs, QTSSFileModuleDispatch);}其中QTSSFileModuleDispatch是Module必须实现的分发函数名。另外一个须要实现的是分发函数,服务器调用该函数实现某个特殊任务。此时,服务器将向分发函数传入任务的名字和一个任务相关的参数块。QTSSFileModule分发函数的实现以下:QTSS_Error QTSSFileModuleDispatch(QTSS_Role inRole, QTSS_RoleParamPtr inParamBlock){ //根据传入的任务名称和入参执行相应的处理函数 switch (inRole) //任务名称 { case QTSS_Register_Role: return Register(&inParamBlock->regParams); case QTSS_Initialize_Role: return Initialize(&inParamBlock->initParams); case QTSS_RereadPrefs_Role: return RereadPrefs(); case QTSS_RTSPRequest_Role: return ProcessRTSPRequest(&inParamBlock->rtspRequestParams); case QTSS_RTPSendPackets_Role: return SendPackets(&inParamBlock->rtpSendPacketsParams); case QTSS_ClientSessionClosing_Role: return DestroySession(&inParamBlock->clientSessionClosingParams); } return QTSS_NoErr;} 其中,分发函数的入参是一个联合,它根据任务名称的不一样,具体的数据结构也不一样,下面是该数据结构的定义: typedef union{ QTSS_Register_Params regParams; QTSS_Initialize_Params initParams; QTSS_ErrorLog_Params errorParams; //此处略去其余多个数据结构…} QTSS_RoleParams, *QTSS_RoleParamPtr; DSS提供了两种方式把咱们本身开发的Module添加到服务器中:一种称为静态模块(Static Module),该方式将咱们开发的Module代码直接编译到内核中去;另外一种称为动态模块(Dynamic Module),该方式将咱们开发的Module单独编译称为一个动态库,而后修改配置,使服务器在启动时将其加载。图9描述了DSS启动和关闭时模块调用流程。 当服务器启动时,它首先装载没有被编译进内核的动态模块,而后才装载被编译进内核的静态模块;因为现有的大部分系统功能都是以静态模块的方式存在的,若是你但愿用本身的模块替换某个系统功能,最好是编写一个动态模块,由于它们将早于静态模块被装载。 不管是静态模块仍是动态模块,它们的代码都是相同的,惟一的不一样就是它们的编译方式。首先为了将静态模块编译到服务器中,咱们必须修改QTSServer.cpp文件中的QTSServer::LoadCompiledInModules,并向其中加入如下代码: QTSSModule* myModule=new QTSSModule(*_XYZ_*); (void)myModule->Initialize(&sCallbacks,&_XYZMAIN_); (void)AddModule(MyModule); 其中,XYZ是静态模块的名字,而XYZMAIN则是其主函数入口。 动态模块的编译方法以下:首先单独编译动态模块为一个动态共享库;将该共享库与QTSS API stub library连接到一块儿;最后将结果文件放置到/usr/sbin/QTSSModules目录中去。此后,服务器在启动时就将自动调用该动态模块。6 结束语DSS是一项十分庞大的工程,并且随着新版本的不断推出和功能的加强,其内容也愈来愈丰富。限于篇幅,本文只是介绍了一些笔者认为比较重要的模块或类,但愿可以配合读者更好的掌握DSS的精髓。咱们之因此研究DSS的源代码,基本上有两个目标:一是但愿利用DSS做为平台进行二次开发,如增长对媒体格式的支持,增长客户身份认证,增长对媒体内容的管理等模块,使DSS成为一个符合实际需求的实用系统。抱此目的的读者在掌握DSS总体流程的基础上,应着重于其二次开发平台(如Module)以及底层文件和媒体格式支持库的研究。另外一类读者可能但愿经过研究DSS源代码,掌握在Internet环境中处理流式媒体的关键技术,以便为未来开发相关底层应用作准备。对于这些读者,笔者认为须要下更多的功夫去研究DSS源代码中的许多细节部分:例如高级网络编程(Socket)、多线程之间的通讯、任务调度、系统资源(CPU、磁盘等)的合理利用以及用于流式媒体的多个标准协议(RTP/RTCP、RTSP、SDP)的具体实现等等。做为三大主要流式媒体应用中惟一一个开放源代码的产品,DSS让开发人员可以从最底层研究流式媒体技术,事实上,当前国内外许多公司正是在DSS的基础上开发了本身的流式媒体相关产品。可是须要指出,做为一个开放源代码的工程,DSS的分发和开发须遵循苹果公司给出的一份版权文件(Apple Public Source License),但愿进行商业化开发的读者应该仔细研读,该文件可从如下网址得到:http://www.publicsource.apple.com。最后,若是读者但愿跟踪DSS的最新进展,能够申请加入其邮件列表。经过该邮件列表,读者能够和全球众多的DSS开发人员交流经验,并且苹果公司的技术人员将会按期的解答各类问题。该邮件列表的地址为:http://www.lists.apple.com。