最近几个月断更了,把精力放在了新的开源项目上,一个用rust写的流媒体服务xiu。
实现过程当中踩了很多坑,今天说下rtmp中的chunk。git
RTMP协议确实复杂,在作这个项目以前,看过不少帖子,看过官方文档,但老是感受不能完全的理解清楚,在实现过一遍此协议以后,感受清楚了很多。github
目前作的测试还不够多,却是发现了一些问题。chunk这个东西看了好久可能不少人仍是不明白,说明一下,RTMP 协议除了3次握手数据,其它的,包括信令和媒体数据(音视频相关的数据),都会被封装成chunk块。服务器
TCP发送数据不是按照协议信令,一次只发送一个信令,有时候会发送多个,rtmp握手阶段从TCP流中读一次数据,握手结束后,会留下一部分数据,这部分要填到chunk解析缓冲数据中。网络
初始化的chunk size要设置成128。tcp
个人测试和排查过程记录以下:
我一开始的chunk size设置成了4096,用ffplay播放流,发送connect信令的时候,老是会多出一个byte,致使amf解析失败,用wireshark抓包,这个byte是没有的,一开始认为wireshark是不会出错的,觉得tokio网络库,因而换成了tcp基础库,这个byte仍是存在,想了个笨方法,找到一个开源的rtmp服务器,也打印出此信令,刚收到tcp数据的时候,这个byte也有,可是amf解析却成功了,接下来就是把每一步的数据都打印出来,从解析chunk到解析amf. 看看这个byte到底是在哪一个步骤消失的,最后发现,这个byte是chunk的第一个byte,fmt+csid,初始化的chunk size不对。。测试
解释状态保留以前说一下chunk的各部分组成,按照官方的文档,chunk由四部分组成:this
前三部分是均可以压缩的。code
/****************************************************************** * 5.3.1.1. Chunk Basic Header * The Chunk Basic Header encodes the chunk stream ID and the chunk * type(represented by fmt field in the figure below). Chunk type * determines the format of the encoded message header. Chunk Basic * Header field may be 1, 2, or 3 bytes, depending on the chunk stream * ID. * * The bits 0-5 (least significant) in the chunk basic header represent * the chunk stream ID. * * Chunk stream IDs 2-63 can be encoded in the 1-byte version of this * field. * 0 1 2 3 4 5 6 7 * +-+-+-+-+-+-+-+-+ * |fmt| cs id | * +-+-+-+-+-+-+-+-+ * Figure 6 Chunk basic header 1 * * Chunk stream IDs 64-319 can be encoded in the 2-byte version of this * field. ID is computed as (the second byte + 64). * 0 1 * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * |fmt| 0 | cs id - 64 | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * Figure 7 Chunk basic header 2 * * Chunk stream IDs 64-65599 can be encoded in the 3-byte version of * this field. ID is computed as ((the third byte)*256 + the second byte * + 64). * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * |fmt| 1 | cs id - 64 | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * Figure 8 Chunk basic header 3 * * cs id: 6 bits * fmt: 2 bits * cs id - 64: 8 or 16 bits * * Chunk stream IDs with values 64-319 could be represented by both 2- * byte version and 3-byte version of this field. ***********************************************************************/
第一个byte的前两个bit是format,有0,1,2,3四个值,这个四个值的做用是压缩message header,详细的会在下面说,后6个bit是chunk stream ID, 简称csid(关于这个字段有坑,下面会解释),6个bit的取值范围为[0,63] ,0和1有特殊用途,2到63表示真正的csid,关于特殊值0和1:orm
解析代码以下:视频
let mut csid = (byte & 0b00111111) as u32; match csid { 0 => { if self.reader.len() < 1 { return Ok(UnpackResult::NotEnoughBytes); } csid = 64; csid += self.reader.read_u8()? as u32; } 1 => { if self.reader.len() < 1 { return Ok(UnpackResult::NotEnoughBytes); } csid = 64; csid += self.reader.read_u8()? as u32; csid += self.reader.read_u8()? as u32 * 256; } _ => {} }
下面说下message header, 这部分比较复杂,有四种类型,对应着basic header里面的format字段的0~3。
/*****************************************************************/ /* 5.3.1.2.1. Type 0 */ /***************************************************************** 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | timestamp(3bytes) |message length | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | message length (cont)(3bytes) |message type id| msg stream id | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | message stream id (cont) (4bytes) | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ *****************************************************************/
任何字段都不省略。
/*****************************************************************/ /* 5.3.1.2.2. Type 1 */ /***************************************************************** 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | timestamp(3bytes) |message length | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | message length (cont)(3bytes) |message type id| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ *****************************************************************/
省略了message stream id,使用上一个chunk的数据。
/************************************************/ /* 5.3.1.2.3. Type 2 */ /************************************************ 0 1 2 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | timestamp(3bytes) | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ ***************************************************/
更绝了,省略了message stream id、message length和 message type id,这个也从前边的chunk读。
3 啥都没有,全从前边拿。
这个字段是可选的,占用4个byte,若是message header里面的timestamp字段大于0xFFFFFF,则读取这个字段。
最后是payload,payload的长度由 message header里面的message length决定。
chunk块的整个读取流程以下,一开始个人实现流程是这样的(有问题)
好了,整个流程基本上介绍清楚了。大标题里面的状态保留我这里有两个意思,第一个意思是要说明一下我上面表述的问题。我说的是『从上一个chunk块』拿省略的字段,这里是不对的,由于有下面这种状况存在:
+--------+---------+-----+------------+------- ---+------------+ | | Chunk |Chunk|Header Data |No.of Bytes|Total No.of | | |Stream ID|Type | | After |Bytes in the| | | | | |Header |Chunk | +--------+---------+-----+------------+-----------+------------+ |Chunk#1 | 3 | 0 | delta: 1000| 32 | 44 | | | | | length: 32,| | | | | | | type: 8, | | | | | | | stream ID: | | | | | | | 12345 (11 | | | | | | | bytes) | | | +--------+---------+-----+------------+-----------+------------+ |Chunk#2 | 3 | 2 | 20 (3 | 32 | 36 | | | | | bytes) | | | +--------+---------+-----+----+-------+-----------+------------+ |Chunk#3 | 4 | 3 | none (0 | 32 | 33 | | | | | bytes) | | | +--------+---------+-----+------------+-----------+------------+ |Chunk#4 | 3 | 3 | none (0 | 32 | 33 | | | | | bytes) | | | +--------+---------+-----+------------+-----------+------------+
注意:message header里面的字段复用是针对chunk stream ID的。
所以上面的状况,chunk2 能够复用 chunk1的message header,可是chunk 4不能复用chunk 3的,因此,在代码里面要特殊处理,每一个csid的message header都须要保存一份,每解析一个chunk,读完basic header以后,须要把这个csid的上一个message header先恢复出来。
第二种状况也是我写代码时未曾想到的:
tcp数据包能够在任何地方拆分。
也就是说,可能一个chunk还没读完,此次的tcp数据就用完了,须要等下一次的数据,这种状况就要保留读取各个字段的状态了。每个读取操做就应该设置一个标记,所以写了下面的四个大状态,message header里面有4个小的状态。
#[derive(Copy, Clone)] enum ChunkReadState { ReadBasicHeader = 1, ReadMessageHeader = 2, ReadExtendedTimestamp = 3, ReadMessagePayload = 4, Finish = 5, } #[derive(Copy, Clone)] enum MessageHeaderReadState {x'x ReadTimeStamp = 1, ReadMsgLength = 2, ReadMsgTypeID = 3, ReadMsgStreamID = 4, }
例如: ReadExtendedTimestamp占用4个bytes,可是读到这里的时候就还剩下2个bytes,就要保留这个状态,下次从TCP里面读出新数据的时候从这个状态开始。
最后rtmp chunk解析的rust完整实如今这里
最后,欢迎star。