tar -xzvf kafka_2.11-2.1.0.tgz cd kafka_2.11-2.1.0 ./bin/zookeeper-server-start.sh config/zookeeper.properties ./bin/kafka-server-start.sh config/server.properties
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test --partitions 1 --replication-factor 1 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
指定topic后,发送、接收消息缓存
设计一个消息引擎系统须要考虑两个重要因素:
(1)消息设计
(2)传输协议设计服务器
为了可以更好的表达语义,最大限度提升重用性,一般采用结构化的方式进行设计
例如:SOAP协议中消息采用XML格式,Web Service支持JSON格式消息
kafka消息用二进制方式保存的,但依然是结构化的消息网络
狭义角度,指定了消息在不一样系统之间传输的方式,主流的协议包含AMQP,Web Service + SOAP, MSMQ(微软)等
广义角度,可能包括任何可以在不一样系统之间传输消息或者执行语义操做的协议或者框架,例如,RPC和序列化框架,
包含Protocol Buffers(简写Google PB, Google)、Dubbo(阿里)等
Kafka设计了一套二进制的消息传输协议,没有采用Google PB等框架架构
传输协议做为一个基础构建快,服务于消息引擎系统实现的消息引擎范型
一个消息引擎范型是一个基于网络的架构范型,描述了消息引擎系统的两个不一样子系统如何互连、交互
最多见的消息引擎范型是(1)消息队列模型(2)发布/订阅模型app
发布订阅模型,有主题(topic)概念,一个topic能够表示消息的容器,发布者将消息生产出来发送到指定的topic中,全部订阅了该topic的订阅者均可以接收到该topic下的全部消息,生活中的报纸订阅能够当作是发布/订阅模型的典型负载均衡
kafka同时支持这两种引擎模型框架
JAVA消息服务(JMS),是一套API规范,提供了接口用于实现分布式系统间的消息传递,同时支持3.3中的两种消息引擎模型分布式
kafka设计之初衷是为了解决互联网公司的超大量数据的实时传输,考虑如下4个方面:
(1)吞吐量/延时
(2)消息持久化
(3)负载均衡和故障转移
(4)伸缩性性能
批处理(batching),能够显著提高吞吐量
kafka写入操做很是快,得益于它对磁盘的使用方法的不一样,虽然kafka会持久化全部数据到磁盘,可是本质上,每次写入操做都只是把数据写入到操做系统的页缓存(page cache)中,而后由操做系统自行决定什么时候将页缓存中的数据写回到磁盘上,这样有3个优势:
(1)操做系统页缓存是在内存中分配的,消息写入速度很是快;
(2)Kafka没必要直接和底层的文件系统打交道,全部琐碎的IO操做交给操做系统来处理
(3)kafka写入操做采用追加写入(append)方式,避免了磁盘随机写操做
普通的物理磁盘(非固态硬盘),随机读写的吞吐量很慢,可是磁盘的顺序读写操做其实很是快,能够匹敌内存的随机IO速度
kafka在设计时候采用了追加写入消息的方式,只能在日志文件的末尾追加写入新的消息,不容许修改已经写入的消息,它属于典型的磁盘顺序访问型操做,显著提升kafka的吞吐量,实际使用过程当中,能够轻松实现每秒几万或者几十万条消息
kafka在读消息时候,首先尝试从操做系统的页缓存中读取,若是命中将消息经页缓存直接发送到网络Socket上,这个过程使用Linux的sendfile系统调用,便是零拷贝(zero copy) 技术
消息持久化的好处是:
(1)解耦消息发送和消息消费,本质上,kafka核心功能是提供了生产者/消费者模式的完整解决方案,经过将消息持久化使得生产者再也不须要直接和消费者方耦合,只是简单的把消息生产出来交由kafka服务器保存,提高了总体的吞吐量
(2)实现灵活的消息处理,消息持久化能够方便实现消息重演(message replay,对于已经处理过的消息可能在将来的某个时间点从新处理一次)
普通的系统在实现持久化时候,可能会先尽可能使用内存,当内存资源耗尽时候,在一次性将数据刷盘,kafka则是,全部数据都会当即写入文件系统的持久化日志中,以后kafka服务器才会返回结果给客户端通知消息成功写入
这样,既实时保存了数据,又减小了kafka程序对于内存的消耗,将节省的内存留给页缓存使用,提升总体性能
kafka实现负载均衡经过智能化分区领导者选举实现(partition leader election)
kafka服务器支持故障转移的方式是使用了会话机制
每一个kafka服务器启动以后,将以会话的形式把本身注册到Zookeeper服务器上,一旦该服务器出现故障,与Zookeeper之间的会话由于超时失效,kafka集群将选举出另一个服务器彻底代替故障服务器
伸缩性(scalability),表示向分布式系统中添加额外的计算资源时吞吐量提高的能力
每一个kafka服务器上的状态统一交由Zookeeper保管,扩展kafka集群只须要,启动新的kafka服务器
kafka服务器并非全部的状态都不保存,只是保存了很轻量级的内部状态,所以整个集群间的维护状态一致性代价很低
消息由消息头部、key、value组成,消息头部包含消息的CRC码,消息版本号,属性,时间戳,键长度,消息体长度等信息,
key,消息键,对消息作partition时候使用,即决定消息被保存在某个topic下的哪一个partition
value,消息体,保存实际的消息数据
timestamp,消息发送的时间戳,用于流式处理以及其余依赖事件的处理语义,若是不指定取当前时间
消息的属性字段,分配了1个字节,目前使用了最低3位保存消息的压缩类型,其余5位没有使用,目前支持的压缩类型
0(无压缩),1(GZIP),2(Snappy),3(LZ4)
kafka在消息设计时候,特地避开了繁重的JAVA堆上内存分配,直接使用紧凑二进制字节数组ByteBuffer而不是独立的对象,至少可以访问多一倍的可用内存,kafka官网,一台32GB内存的机器,kafka几乎能够用到28~30GB的物理内存
大量使用页缓存而非堆内存还有一个好处,当出现kafka broker进程崩溃时候,堆内存上的数据一并消失,可是页缓存的数据依然存在,下次kafka broker重启后能够继续提供服务,不须要单独热缓存
从概念上,topic只是一个逻辑概念,表明了一类消息,能够认为是消息被发送到的地方,使用topic来区分实际业务
topic一般被多个消费者订阅,出于性能考虑,kafka不是topic message两层结构,而是使用topic partition message 三层结构分散负载,
topic由多个partition组成,partition是不可修改的有序消息序列,每一个partition有专属的partition号,一般由0开始,用户对partition惟一能作的操做是在消息序列的尾部追加写入消息,partition上的每一条消息被分配一个惟一的序列号,该序列号称为位移(offset),从0开始顺序递增的整数,位移信息能够惟必定位到某个partition下的一条消息
topic partition下的每一个消息都被分配了一个位移值,
kafka消费者端也有位移的概念,每条消息的某个partition的位移是固定的,但消费该partition的消费者的位移会随着消费进度不断前移(不超过partition中的最新一个消息的位移)
kafka中的一条消息其实就是一个<topic, partition, offset>三元组(tuple)
kafka的副本(replica)分红两个角色:领导者和追随者
这种角色设定几乎彻底取代了过去的主备提法(master-slave),与传统的主备系统(MySQL)不一样的是,
在这类系统中,一般只有leader对外提供服务,follower存在的惟一价值是充当leader的候补,一旦leader挂掉当即有一个follower被选举成新的leader