Kafka使用基于TCP的二进制协议,该协议定义了全部API的请求及响应消息。全部的消息都是经过长度来分隔,而且由后面描述的基本类型组成。html
RequestOrResponse --> Size( RequestMessage | ResponseMessage ) Size --> int32
Size给出了后续请求或响应消息的字节长度。
解析时应先读取4字节的长度N,而后读取并解析后续的N字节请求/响应内容。node
RequestMessage --> request_header request request_header --> api_key api_version correlation_id client_id api_key --> int16 api_version --> int16 correlation_id -->int32 client_id --> string request --> MetadataRequest | ProduceRequest | FetchRequest | ...
api_key: 这是一个表示所调用的API的数字ID(即不一样类型的请求有不一样的ID)
api_version: 这是该API的一个数字版本号,Kafka为每一个API定义一个版本号,该版本号容许服务器根据版本号正确地解释请求内容。
correlation_id: 这是一个用户提供的整数。它将被服务器原封不动的传回给客户端。用于匹配客户端和服务器之间的请求和响应。
client_id: 这是客户端应用程序自定义的标识
request: 具体的请求内容,例如元数据请求、生产请求、获取消息请求、偏移量请求等。apache
ResponseMessage --> response_header response response_header --> correlation_id correlation_id --> int32 response --> MetadataResponse | ProduceResponse | ...
correlation_id: 即请求中携带的correlation_id
response: 具体的响应内容api
MessageSet --> [Offset MessageSize Message] Offset --> int64 MessageSize --> int32 Message --> Crc MagicByte Attributes Key Value Crc --> int32 MagicByte --> int8 Attributes --> int8 Key --> bytes Value --> bytes
Offset: 这是在Kafka中做为日志序列号使用的偏移量。
Crc:Crc是的剩余消息字节的CRC32值。broker和消费者可用来检查信息的完整性。
MagicByte: 这是一个用于容许消息二进制格式的向后兼容演化的版本id。当前值是0。 Attributes: 这个字节保存有关信息的元数据属性。
最低的3位包含用于消息的压缩编解码器。
第四位表示时间戳类型,0表明CreateTime,1表明LogAppendTime。生产者必须把这个位设成0。
全部其余位必须被设置为0。
Timestamp: 消息的时间戳。时间戳类型在Attributes域中体现。单位为从UTC标准准时间1970年1月1日0点到所在时间的毫秒数。
Key: Key是一个可选项,它主要用来进行指派分区。Key能够为null。
Value: Value是消息的实际内容,类型是字节数组。
注:此格式为V0版本的格式,V1详见官方文档,一样后续具体请求格式均以V0版本进行说明数组
SaslHandshake Request --> mechanism mechanism --> string
mechanism: 客户端选择的SASL机制服务器
SaslHandshake Response --> error_code [enabled_mechanisms] error_code --> int16 enabled_mechanisms --> string
error_code: 错误码
enabled_mechanisms: 服务端使能的SASL mechanisms列表ide
Metadata Request --> [topics] topics --> string
topics: 要获取元数据的topic数组。 若是为空,则返回全部topic的元数据。性能
Metadata Response --> [brokers] [topic_metadata] brokers --> node_id host port node_id --> int32 host --> string port --> int32 topic_metadata --> error_code topic [partition_metadata] error_code --> int16 topic --> string partition_metadata --> error_code partition leader [replicas] [isr] error_code --> int16 partition --> int32 leader --> int32 replicas --> int32 isr --> int32
node_id: broker的id, 即kafka节点的ID
host: broker的主机名
port: broker侦听端口
partition: topic对应partition的id
leader: 该topic全部partition中,扮演leader角色的partition所在的broker的id
replicas: 全部partition中,做为slave的节点集合
isr: 副本集合中,与leader处于跟随状态的节点集合fetch
ProduceRequest --> asks timeout [topic [partition messageSetSize MessageSet]] acks --> int16 timeout --> int32 topic --> string partition --> int32 messageSetSize --> int32
acks: 这个值表示服务端收到多少确认后才发送反馈消息给客户端,若是设置为0,即服务端不发送response;若是设置为1,即服务端将等到数据写入本地日志后发送response;若是设置为-1,服务端将阻塞,直到这个消息被全部的同步副本写入后再发送response
timeout: 这个值提供了以毫秒为单位的超时时间
topic:该数据将会发布到的topic名称
partition:该数据将会发布到的分区
messageSetSize: 后续消息集的长度,单位是字节
messageSet:前面描述的标准格式的消息集合ui
ProduceResponse --> [responses] responses --> topic [partition_responses] topic --> string partition_responses --> partition error_code base_offset partition --> int32 error_code --> int16 base_offset --> int64
topic: 此响应对应的topic名称
partition: topic对应的partition的id
base_offset: produce的消息在partition中的偏移
Fetch Request --> replica_id max_wait_time min_bytes [topics] replica_id --> int32 max_wait_time --> int32 min_bytes --> int32 topics --> topic [partitions] partitions --> partition fetch_offset max_bytes partition --> int32 fetch_offset --> int64 max_bytes --> int32
replica_id: 发起这个请求的副本节点ID
max_wait_time: 若是没有足够的数据可发送时,最大阻塞等待时间,以毫秒为单位
min_bytes: 返回响应消息的最小字节数目,必须设置。若是客户端将此值设为0,服务器将会当即返回,但若是没有新的数据,服务端会返回一个空消息集。若是它被设置为1,则服务器将在至少一个分区收到一个字节的数据的状况下当即返回,或者等到超时时间达到。经过设置较高的值,结合超时设置,消费者能够在牺牲一点实时性能的状况下经过一次读取较大的字节的数据块从而提升的吞吐量
topic: topic的名称
partition: topic对应partition的id
fetch_offset: 获取数据的起始偏移量
max_bytes: 此分区返回消息集所能包含的最大字节数。
Fetch Response --> [responses] responses --> topic [partition_responses] topic --> string partition_responses --> partition_header messageSet partition_header --> partition error_code high_watermark parttion --> int32 error_code --> int16 high_watermark --> int64
high_watermark: 该partition分区中最后提交的消息的偏移量。此信息可被客户端用来肯定后面还有多少条消息。
ListOffsets Request --> replica_id [topics] replica_id --> int32 topics --> topic [partitions] topic --> string partitions --> partition timestamp max_num_offsets partition --> int32 timestamp --> int64 max_num_offsets --> int32
timestamp: 用来请求必定时间(单位:毫秒)前的全部消息
-1表示获取最新的offset,即下一个produce的消息的offset
-2表示获取最先的有效偏移量
ListOffsets Response --> [responses] responses --> topic [partition_responses] topic --> string partition_responses -->partition error_code [offsets] partition --> int32 error_code --> int16 offsets --> int64