Apache Kafka最先是由LinkedIn开源出来的分布式消息系统,如今是Apache旗下的一个子项目,而且已经成为开源领域应用最普遍的消息系统之一。Kafka社区很是活跃,从0.9版本开始,Kafka的标语已经从“一个高吞吐量,分布式的消息系统”改成"一个分布式流平台"。java
Kafka和传统的消息系统不一样在于:git
kafka是一个分布式系统,易于向外扩展。github
它同时为发布和订阅提供高吞吐量面试
它支持多订阅者,当失败时能自动平衡消费者apache
消息的持久化bootstrap
kafka和其余消息队列的对比:安全
|
kafkabash |
activemq服务器 |
rabbitmq网络 |
rocketmq |
---|---|---|---|---|
背景 |
Kafka 是LinkedIn 开发的一个高性能、分布式的消息系统,普遍用于日志收集、流式数据处理、在线和离线消息分发等场景 |
ActiveMQActiveMQ是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通讯。 |
RabbitMQ是一个由erlang开发的AMQP协议(Advanced Message Queue )的开源实现。 |
RocketMQ是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给Apache基金会,已经于2016年11月成为 Apache 孵化项目 |
开发语言 |
java,scala |
Java |
Erlang |
Java |
协议支持 |
本身制定的一套协议 |
JMS协议 |
AMQP |
JMS、MQTT |
持久化支持 |
支持 |
支持 |
支持 |
支持 |
事务支持 |
0.11.0以后支持 |
支持 |
支持 |
支持 |
producer容错 |
在kafka中提供了ack配置选项, request.required.acks=-1,级别最低,生产者不须要关心是否发送成功 request.required.acks=0,只须要leader分区有了便可 request.required.acks=1,isr集合中的全部同步了才返回 可能会有重复数据 |
发送失败后便可重试 |
有ack模型 ack模型可能重复消息 事务模型保证彻底一致 |
和kafka相似 |
吞吐量 |
kafka具备高的吞吐量,内部采用消息的批量处理,zero-copy机制,数据的存储和获取是本地磁盘顺序批量操做,具备O(1)的复杂度,消息处理的效率很高 |
rabbitMQ在吞吐量方面稍逊于kafka,他们的出发点不同,rabbitMQ支持对消息的可靠的传递,支持事务,不支持批量的操做;基于存储的可靠性的要求存储能够采用内存或者硬盘。 |
kafka在topic数量很少的状况下吞吐量比rocketMq高,在topic数量多的状况下rocketMq比kafka高 |
|
负载均衡 |
kafka采用zookeeper对集群中的broker、consumer进行管理,能够注册topic到zookeeper上;经过zookeeper的协调机制,producer保存对应topic的broker信息,能够随机或者轮询发送到broker上;而且producer能够基于语义指定分片,消息发送到broker的某分片上 |
|
rabbitMQ的负载均衡须要单独的loadbalancer进行支持 |
NamerServer进行负载均衡 |
producer
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class UserKafkaProducer extends Thread
{
private final KafkaProducer<Integer, String> producer;
private final String topic;
private final Properties props = new Properties();
public UserKafkaProducer(String topic)
{
props.put("metadata.broker.list", "localhost:9092");
props.put("bootstrap.servers", "master2:6667");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<Integer, String>(props);
this.topic = topic;
}
@Override
public void run() {
int messageNo = 1;
while (true)
{
String messageStr = new String("Message_" + messageNo);
System.out.println("Send:" + messageStr);
//返回的是Future<RecordMetadata>,异步发送
producer.send(new ProducerRecord<Integer, String>(topic, messageStr));
messageNo++;
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
复制代码
Properties props = new Properties();
/* 定义kakfa 服务的地址,不须要将全部broker指定上 */
props.put("bootstrap.servers", "localhost:9092");
/* 制定consumer group */
props.put("group.id", "test");
/* 是否自动确认offset */
props.put("enable.auto.commit", "true");
/* 自动确认offset的时间间隔 */
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
/* key的序列化类 */
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/* value的序列化类 */
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/* 定义consumer */
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
/* 消费者订阅的topic, 可同时订阅多个 */
consumer.subscribe(Arrays.asList("foo", "bar"));
/* 读取数据,读取超时时间为100ms */
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
复制代码
对于kafka的架构原理咱们先提出几个问题?
1.Kafka的topic和分区内部是如何存储的,有什么特色?
2.与传统的消息系统相比,Kafka的消费模型有什么优势?
3.Kafka如何实现分布式的数据存储与数据读取?
在一套kafka架构中有多个Producer,多个Broker,多个Consumer,每一个Producer能够对应多个Topic,每一个Consumer只能对应一个ConsumerGroup。
整个Kafka架构对应一个ZK集群,经过ZK管理集群配置,选举Leader,以及在consumer group发生变化时进行rebalance。
名称 |
解释 |
---|---|
Broker |
消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker能够组成一个Kafka集群 |
Topic |
主题,Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都须要指定一个topic |
Producer |
消息生产者,向Broker发送消息的客户端 |
Consumer |
消息消费者,从Broker读取消息的客户端 |
ConsumerGroup |
每一个Consumer属于一个特定的Consumer Group,一条消息能够发送到多个不一样的Consumer Group,可是一个Consumer Group中只能有一个Consumer可以消费该消息 |
Partition |
物理上的概念,一个topic能够分为多个partition,每一个partition内部是有序的 |
在Kafka中的每一条消息都有一个topic。通常来讲在咱们应用中产生不一样类型的数据,均可以设置不一样的主题。一个主题通常会有多个消息的订阅者,当生产者发布消息到某个主题时,订阅了这个主题的消费者均可以接收到生产者写入的新消息。
kafka为每一个主题维护了分布式的分区(partition)日志文件,每一个partition在kafka存储层面是append log。任何发布到此partition的消息都会被追加到log文件的尾部,在分区中的每条消息都会按照时间顺序分配到一个单调递增的顺序编号,也就是咱们的offset,offset是一个long型的数字,咱们经过这个offset能够肯定一条在该partition下的惟一消息。在partition下面是保证了有序性,可是在topic下面没有保证有序性。
在上图中在咱们的生产者会决定发送到哪一个Partition。
1.若是没有Key值则进行轮询发送。
2.若是有Key值,对Key值进行Hash,而后对分区数量取余,保证了同一个Key值的会被路由到同一个分区,若是想队列的强顺序一致性,可让全部的消息都设置为同一个Key。
消息由生产者发送到kafka集群后,会被消费者消费。通常来讲咱们的消费模型有两种:推送模型(psuh)和拉取模型(pull)
基于推送模型的消息系统,由消息代理记录消费状态。消息代理将消息推送到消费者后,标记这条消息为已经被消费,可是这种方式没法很好地保证消费的处理语义。好比当咱们把已经把消息发送给消费者以后,因为消费进程挂掉或者因为网络缘由没有收到这条消息,若是咱们在消费代理将其标记为已消费,这个消息就永久丢失了。若是咱们利用生产者收到消息后回复这种方法,消息代理须要记录消费状态,这种不可取。若是采用push,消息消费的速率就彻底由消费代理控制,一旦消费者发生阻塞,就会出现问题。
Kafka采起拉取模型(poll),由本身控制消费速度,以及消费的进度,消费者能够按照任意的偏移量进行消费。好比消费者能够消费已经消费过的消息进行从新处理,或者消费最近的消息等等。
单线程模式适用于并发连接数小,逻辑简单,数据量小。
在kafka中,consumer和producer都是使用的上面的单线程模式。这种模式不适合kafka的服务端,在服务端中请求处理过程比较复杂,会形成线程阻塞,一旦出现后续请求就会没法处理,会形成大量请求超时,引发雪崩。而在服务器中应该充分利用多线程来处理执行逻辑。
在kafka服务端采用的是多线程的Selector模型,Acceptor运行在一个单独的线程中,对于读取操做的线程池中的线程都会在selector注册read事件,负责服务端读取请求的逻辑。成功读取后,将请求放入message queue共享队列中。而后在写线程池中,取出这个请求,对其进行逻辑处理,即便某个请求线程阻塞了,还有后续的县城从消息队列中获取请求并进行处理,在写线程中处理完逻辑处理,因为注册了OP_WIRTE事件,因此还须要对其发送响应。
在Kafka中保证高可靠模型的依靠的是副本机制,有了副本机制以后,就算机器宕机也不会发生数据丢失。
kafka一个topic下面的全部消息都是以partition的方式分布式的存储在多个节点上。同时在kafka的机器上,每一个Partition其实都会对应一个日志目录,在目录下面会对应多个日志分段(LogSegment)。LogSegment文件由两部分组成,分别为“.index”文件和“.log”文件,分别表示为segment索引文件和数据文件。这两个文件的命令规则为:partition全局的第一个segment从0开始,后续每一个segment文件名为上一个segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充,以下,假设有1000条消息,每一个LogSegment大小为100,下面展示了900-1000的索引和Log:
因为kafka消息数据太大,若是所有创建索引,即占了空间又增长了耗时,因此kafka选择了稀疏索引的方式,这样的话索引能够直接进入内存,加快偏查询速度。
简单介绍一下如何读取数据,若是咱们要读取第911条数据首先第一步,找到他是属于哪一段的,根据二分法查找到他属于的文件,找到0000900.index和00000900.log以后,而后去index中去查找 (911-900) =11这个索引或者小于11最近的索引,在这里经过二分法咱们找到了索引是[10,1367]而后咱们经过这条索引的物理位置1367,开始日后找,直到找到911条数据。
上面讲的是若是要找某个offset的流程,可是咱们大多数时候并不须要查找某个offset,只须要按照顺序读便可,而在顺序读中,操做系统会对内存和磁盘之间添加page cahe,也就是咱们日常见到的预读操做,因此咱们的顺序读操做时速度很快。可是kafka有个问题,若是分区过多,那么日志分段也会不少,写的时候因为是批量写,其实就会变成随机写了,随机I/O这个时候对性能影响很大。因此通常来讲Kafka不能有太多的partition。针对这一点,RocketMQ把全部的日志都写在一个文件里面,就能变成顺序写,经过必定优化,读也能接近于顺序读。
能够思考一下:1.为何须要分区,也就是说主题只有一个分区,难道不行吗?2.日志为何须要分段
1.分区是为了水平扩展 2.日志若是在同一个文件太大会影响性能。若是日志无限增加,查询速度会减慢
Kafka的副本机制是多个服务端节点对其余节点的主题分区的日志进行复制。当集群中的某个节点出现故障,访问故障节点的请求会被转移到其余正常节点(这一过程一般叫Reblance),kafka每一个主题的每一个分区都有一个主副本以及0个或者多个副本,副本保持和主副本的数据同步,当主副本出故障时就会被替代。
在Kafka中并非全部的副本都能被拿来替代主副本,因此在kafka的leader节点中维护着一个ISR(In sync Replicas)集合,翻译过来也叫正在同步中集合,在这个集合中的须要知足两个条件:
节点必须和ZK保持链接
在同步的过程当中这个副本不能落后主副本太多
另外还有个AR(Assigned Replicas)用来标识副本的全集,OSR用来表示因为落后被剔除的副本集合,因此公式以下:ISR = leader + 没有落后太多的副本; AR = OSR+ ISR;
这里先要说下两个名词:HW(高水位)是consumer可以看到的此partition的位置,LEO是每一个partition的log最后一条Message的位置。HW能保证leader所在的broker失效,该消息仍然能够重新选举的leader中获取,不会形成消息丢失。
当producer向leader发送数据时,能够经过request.required.acks参数来设置数据可靠性的级别:
1(默认):这意味着producer在ISR中的leader已成功收到的数据并获得确认后发送下一条message。若是leader宕机了,则会丢失数据。
0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种状况下数据传输效率最高,可是数据可靠性确是最低的。
-1:producer须要等待ISR中的全部follower都确认接收到数据后才算一次发送完成,可靠性最高。可是这样也不能保证数据不丢失,好比当ISR中只有leader时(其余节点都和zk断开链接,或者都没追上),这样就变成了acks=1的状况。
在分布式系统中通常有三种处理语义:
at-least-once:
至少一次,有可能会有屡次。若是producer收到来自ack的确认,则表示该消息已经写入到Kafka了,此时恰好是一次,也就是咱们后面的exactly-once。可是若是producer超时或收到错误,而且request.required.acks配置的不是-1,则会重试发送消息,客户端会认为该消息未写入Kafka。若是broker在发送Ack以前失败,但在消息成功写入Kafka以后,这一次重试将会致使咱们的消息会被写入两次,因此消息就不止一次地传递给最终consumer,若是consumer处理逻辑没有保证幂等的话就会获得不正确的结果。
在这种语义中会出现乱序,也就是当第一次ack失败准备重试的时候,可是第二消息已经发送过去了,这个时候会出现单分区中乱序的现象,咱们须要设置Prouducer的参数max.in.flight.requests.per.connection,flight.requests是Producer端用来保存发送请求且没有响应的队列,保证Producer端未响应的请求个数为1。
at-most-once:
若是在ack超时或返回错误时producer不重试,也就是咱们讲request.required.acks=-1,则该消息可能最终没有写入kafka,因此consumer不会接收消息。
exactly-once:
恰好一次,即便producer重试发送消息,消息也会保证最多一次地传递给consumer。该语义是最理想的,也是最难实现的。在0.10以前并不能保证exactly-once,须要使用consumer自带的幂等性保证。0.11.0使用事务保证了
要实现exactly-once在Kafka 0.11.0中有两个官方策略:
每一个producer在初始化的时候都会被分配一个惟一的PID,对于每一个惟一的PID,Producer向指定的Topic中某个特定的Partition发送的消息都会携带一个从0单调递增的sequence number。
在咱们的Broker端也会维护一个维度为<PID,Topic,Partition>,每次提交一次消息的时候都会对齐进行校验:
若是消息序号比Broker维护的序号大一以上,说明中间有数据还没有写入,也即乱序,此时Broker拒绝该消息,Producer抛出InvalidSequenceNumber
若是消息序号小于等于Broker维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息,Producer抛出DuplicateSequenceNumber
若是消息序号恰好大一,就证实是合法的
上面所说的解决了两个问题:
1.当Prouducer发送了一条消息以后失败,broker并无保存,可是第二条消息却发送成功,形成了数据的乱序。
2.当Producer发送了一条消息以后,broker保存成功,ack回传失败,producer再次投递重复的消息。
上面所说的都是在同一个PID下面,意味着必须保证在单个Producer中的同一个seesion内,若是Producer挂了,被分配了新的PID,这样就没法保证了,因此Kafka中又有事务机制去保证。
在kafka中事务的做用是
实现exactly-once语义
保证操做的原子性,要么所有成功,要么所有失败。
有状态的操做的恢复
事务能够保证就算跨多个<Topic, Partition>,在本次事务中的对消费队列的操做都当成原子性,要么所有成功,要么所有失败。而且,有状态的应用也能够保证重启后从断点处继续处理,也即事务恢复。在kafka的事务中,应用程序必须提供一个惟一的事务ID,即Transaction ID,而且宕机重启以后,也不会发生改变,Transactin ID与PID可能一一对应。区别在于Transaction ID由用户提供,而PID是内部的实现对用户透明。为了Producer重启以后,旧的Producer具备相同的Transaction ID失效,每次Producer经过Transaction ID拿到PID的同时,还会获取一个单调递增的epoch。因为旧的Producer的epoch比新Producer的epoch小,Kafka能够很容易识别出该Producer是老的Producer并拒绝其请求。为了实现这一点,Kafka 0.11.0.0引入了一个服务器端的模块,名为Transaction Coordinator,用于管理Producer发送的消息的事务性。该Transaction Coordinator维护Transaction Log,该log存于一个内部的Topic内。因为Topic数据具备持久性,所以事务的状态也具备持久性。Producer并不直接读写Transaction Log,它与Transaction Coordinator通讯,而后由Transaction Coordinator将该事务的状态插入相应的Transaction Log。Transaction Log的设计与Offset Log用于保存Consumer的Offset相似。
关于消息队列或者Kafka的一些常见的面试题,经过上面的文章能够提炼出如下几个比较经典的问题:
大部分问题均可以从上面总结后找到答案,若是还不会的话就关注个人公众号,让我为你解答吧。
最后这篇文章被我收录于JGrowing,一个全面,优秀,由社区一块儿共建的Java学习路线,若是您想参与开源项目的维护,能够一块儿共建,github地址为:https://github.com/javagrowing/JGrowing 麻烦给个小星星哟。
打个广告,若是你以为这篇文章对你有文章,能够关注个人技术公众号,你的关注和转发是对我最大的支持,O(∩_∩)O