Apache Kafka是一个分布式流媒体平台。这究竟是什么意思? 流媒体平台具备三个关键功能: 发布和订阅记录流,相似于消息队列或企业消息系统。 以容错的持久方式存储记录流。 记录发生时的处理流。数据库
Kafka一般用于两类普遍的应用:apache
1.创建实时流数据管道,在系统或应用之间可靠地获取数据bootstrap
2.创建对数据流进行转换或反应的实时流应用程序数组
为了理解Kafka是如何作到这些的,让咱们从下到上探究Kafka的能力。bash
首先是几个概念:服务器
Kafka做为一个集群运行在一个或多个能够跨多个数据中心的服务器上。网络
Kafka集群将记录流存储在称为主题的类别中。 每一个记录由一个键、一个值和一个时间戳组成。session
生产者API(Producer API)容许应用程序将记录流发布到一个或多个Kafka主题。多线程
消费者API(Consumer API)容许应用程序订阅一个或多个主题,并处理产生给它们的记录流。架构
流API(Streams API)容许应用充当流处理器,消耗来自一个或多个主题的输入流,并将输出流生成到一个或多个输出主题,有效地将输入流转换为输出流。
链接器API(Connector API)容许创建和运行可重用的生产者或消费者,将Kafka主题链接到现有的应用程序或数据系统。例如,关系数据库的链接器可能会捕获表的每个更改。
让咱们先深刻研究核心抽象Kafka为一条记录流提供的主题。
主题是记录发布的类别或进给名称。Kafka中的主题老是多用户的,也就是说,主题能够有一个或多个订阅该数据的用户。
对于每一个主题,Kafka集群维护一个看起来像这样的分区日志:
每个分区都是一个有序的、不可变的记录序列,它连续地附加到结构化提交日志。每一个分区中的记录都被分配一个连续的ID号,称为惟一地标识分区内的每一个记录的偏移量。
不管是否已经使用可配置的保留周期消耗,全部的已发布记录都能持久地保存Kafka集群。例如,若是保留策略设置为两天,那么在发布记录后的两天,它能够用于消费,以后将被丢弃以释放空间。Kafka的性能相对于数据大小是有效不变的,因此长时间存储数据不是问题。
事实上,在每一个消费者基础上保留的惟一元数据是该用户在日志中的偏移或位置。这种偏移是由消费者控制的:正常状况下,消费者会在读取记录时线性地偏移其偏移量,但事实上,因为该位置是由消费者控制的,因此它能够按它喜欢的任何顺序消耗记录。例如,消费者能够重置为旧的偏移量,以从新处理过去的数据,或者跳过最近的记录并开始从“如今”开始消费。
这种组合的特征意味着Kafka的消费者很是便宜,他们能够来来每每,对集群或其余消费者没有太大的影响。例如,您可使用咱们的命令行工具来“尾随”任何主题的内容,而不改变任何现有消费者所消耗的内容。
日志中的分区有多种用途。首先,它们容许日志超出一个适合于单个服务器的大小。每一个单独的分区必须适合于承载它的服务器,可是一个主题可能有许多分区,所以它能够处理任意数量的数据。其次,它们做为并行的单位更多的是在这一点上。
日志分区被分布在Kafka集群中的服务器上,每一个服务器处理数据并请求共享分区。每一个分区在可配置的多个服务器上复制以容错。
每一个分区都有一个服务器充当“领导者”和零个或多个服务器,充当“追随者”。领导者处理全部的读写请求的分区,而追随者被动复制的领导者。若是领导者失败,其中的一个追随者将自动成为新的领导者。每一个服务器充当一些分区的领导者和其余人的追随者,所以集群内的负载很好地平衡。
Kafka MirrorMaker为您的集群提供地理复制支持。使用镜像机,消息在多个数据中心或云区域上被复制。能够在主动/被动场景中使用此备份和恢复;或者在活动/活动场景中,将数据更靠近用户,或支持数据位置要求。
生产者将数据发布到他们选择的主题中。生产者负责选择要分配给主题内的哪一个分区的记录。这能够在循环的方式下完成,只是为了平衡负载,或者它能够根据一些语义划分函数来完成(例如基于记录中的某些键)。
消费者用消费者组名称来标记本身,而且发布到主题的每一个记录被传递到每一个订阅消费者组中的一个消费者实例。消费者实例能够在单独的进程中或在单独的机器上。
若是全部的消费者实例都具备相同的消费群,那么记录将有效地在消费者实例上进行负载均衡。
若是全部的消费者实例都有不一样的消费群体,那么每一个记录将被广播到全部的消费过程。
两个服务器Kafka集群托管四个分区(P0至P3)与两个消费群体。消费者组A有两个消费者实例,B组有四个。
然而,更常见的是,咱们发现话题有少许的消费群体,每一个都有一个“逻辑用户”。每一个组由许多可扩展性和容错性的消费者实例组成。这仅仅是发布订阅语义,其中订阅服务器是一组消费者而不是单个进程。
在Kafka中实现消费的方法是经过将日志中的分区除以消费者实例,使得每一个实例在任什么时候间点都是分区的“公平共享”的惟一消费者。这个组中的成员保持过程是由Kafka协议动态处理的。若是新实例加入组,它们将从组的其余成员接管一些分区;若是一个实例死亡,它的分区将被分发到其他实例。
Kafka只提供一个分区内的记录的总顺序,而不是在一个主题中的不一样分区之间。每个分区排序结合能力分区数据的关键是足够的大多数应用程序。可是,若是须要对记录进行总排序,则能够用只有一个分区的主题来实现,但这将意味着每一个消费者组只有一个消费者进程。
能够将Kafka部署为多租户解决方案。多租户是经过配置哪些主题能够产生或消耗数据来实现的。也有对配额的操做支持。管理员能够在请求上定义和执行配额,以控制客户端使用的代理资源。
在高级别Kafka提供如下保证:
生产者发送给特定主题分区的消息将按照发送的顺序添加。也就是说,若是记录M1是由与记录M2相同的生产者发送的,M1是最早发送的,那么M1将具备比M2更低的偏移量,而且在日志中出现得更早。
一个用户实例以记录存储在日志中的顺序查看记录。
对于具备复制因子N的主题,咱们将容忍多达N-1服务器故障而不丢失提交到日志的任何记录。
Kafka能够保证同一个分区里的消息是有序的,也就是说生产者按照必定的顺序发送消息,broker就会按照这个顺序把他们写入分区,消费者也会按照一样的顺序读取他们
###### Kafka做为消息传递系统
复制代码
Kafka的流概念如何与传统的企业消息系统相比较?
消息传递传统上有两种模式:排队和发布订阅。在队列中,消费者池能够从服务器读取,而且每一个记录都转到其中一个;在发布订阅中,记录被广播给全部消费者。这两种模式各有其优势和缺点。排队的优势在于,它容许您在多个消费者实例上划分数据的处理,这使得您能够对处理进行缩放。不幸的是,一旦一个进程读取了它的数据,队列就再也不是多用户。发布订阅容许您将数据广播到多个进程,可是因为每一个消息都流向每一个订阅服务器,因此没法进行缩放处理。
Kafka的消费群体概念归纳了这两个概念。与队列同样,消费者组容许您在进程集合(消费者组的成员)上划分处理。与发布订阅同样,Kafka容许您向多个用户组广播消息。
Kafka模型的优势是,每一个主题都具备这些属性,它能够缩放处理,而且也是多用户,不须要选择一个或另外一个。
Kafka比传统的消息传递系统具备更强的排序保证。
传统队列在服务器上保留记录顺序,若是多个用户从队列中消耗,则服务器按其存储的顺序分发记录。然而,尽管服务器按顺序分发记录,可是记录是异步传送给消费者的,所以它们可能会在不一样的消费者之间无序地到达。这实际上意味着在并行消耗的存在下记录的顺序丢失。消息传递系统常常围绕着这一点而工做,它有一个“独占消费者”的概念,它只容许一个进程从队列中消耗,但这固然意味着在处理过程当中没有并行性。
Kafka作得更好。经过在主题中具备并行性分区的概念,Kafka可以在消费者进程池中提供排序保证和负载平衡。这是经过将主题中的分区分配给消费者组中的消费者来实现的,这样每一个分区就被该组中的一个消费者彻底消耗掉。经过这样作,咱们确保消费者是该分区的惟一读取器,并按顺序消耗数据。因为有许多分区,这仍然平衡了许多消费者实例的负载。可是注意,消费者组中的消费者实例不能多于分区。
任何容许发布消息的消息队列与它们之间的解耦都有效地充当了飞行消息的存储系统。Kafka的不一样之处在于它是一个很是好的存储系统。
写入Kafka的数据被写入磁盘并复制用于容错。Kafka容许生产商等待确认,这样写才被认为是完整的,直到它被彻底复制,并保证即便服务器写入失败也会坚持。
磁盘结构Kafka使用规模井Kafka将执行相同的,不管你有50 KB或50 TB的持久性数据在服务器上。
做为认真对待存储并容许客户端控制其读取位置的结果,您能够将Kafka视为专用于高性能、低延迟提交日志存储、复制和传播的专用分布式文件系统。
仅仅读取、写入和存储数据流是不够的,其目的是实现对流的实时处理。
在Kafka中,流处理器是从输入主题获取连续数据流的任何东西,对该输入执行一些处理,并产生连续的数据流到输出主题。
例如,零售应用程序可能会接收销售和出货的输入流,并输出从该数据计算的从新排序和价格调整的流。
能够直接使用生产者和消费者API来进行简单的处理。然而,对于更复杂的转换,Kafka提供了一个彻底集成的流API。这容许构建非平凡处理的应用程序,它们能够计算流中的聚合或一块儿加入流。
这个工具备助于解决这类应用程序面临的难题:处理无序数据、从新处理输入、代码更改、执行状态计算等。
流API构建在Kafka提供的核心原语上:它使用生产者和消费者API来进行输入,使用Kafka进行状态存储,并在流处理器实例中使用相同的组机制来容错。
消息传递、存储和流处理的这种组合看起来是不寻常的,但这对于Kafka做为流媒体平台的角色是相当重要的。
分布式文件系统(如HDFS)容许存储静态文件以进行批量处理。实际上,这样的系统容许存储和处理过去的历史数据。
传统的企业消息系统容许处理订阅后到达的将来消息。以这种方式构建的应用程序在到达时处理将来的数据。
Kafka结合了这两种能力,组合对于Kafka做为流媒体应用平台以及流数据管道来讲都是相当重要的。
经过结合存储和低延迟订阅,流式应用程序能够以相同的方式处理过去和未来的数据。也就是说,单个应用程序能够处理历史、存储的数据,而不是在到达最后记录时结束,它能够在未来的数据到达时保持处理。这是包含批处理和消息驱动应用程序的流处理的通常概念。
一样,对于流式数据管道,订阅到实时事件使得使用Kafka用于很是低延迟的流水线是可能的;可是,可靠地存储数据的能力使得它能够用于关键数据,其中必须保证数据的传递或整合。n具备离线系统,只周期性地加载数据,或者能够在长时间内进行维护。流处理设施使得在到达时转换数据成为可能。
生产者归纳
Kafka发送消息的主要步骤 咱们从建立一个ProducerRecord对象开始,ProducerRecord对象须要包含目标主题和要发送的内容.咱们还能够指定键分区,在发送ProducerRecord 对象时,生产者要先把键和值对象序列化成字节数组,这样他们才能在网络上传输. 接下来,数据被传给分区器,若是以前在ProducerRecord对象里指定了分区,那么分区器就不会再作任何事情,直接把指定的分区返回.若是没有指定分区,那么分区器就会根据ProducerRecord对象的键来选择一个分区.选好分区之后,生产者就知道该往哪一个主题和分区发送这条消息了. 紧接着,这条记录被添加到一个记录批次里,这个批次里的全部消息会被发送到相同的主题和分区上.有一个独立的线程负责把这些记录批次发送到相应的broker上. 服务器在收到这些消息时会返回一个响应.若是消息成功写入Kafka,就返回一个RecordMetaData对象,它包含了主题和分区信息,以及记录在分区里的偏移量.若是写入失败,则返回一个错误.生产者在收到错误以后会尝试从新发送消息,几回以后若是仍是失败,就返回错误信息.
要往Kafka写入消息,首先要建立一个生产者对象,并设置一些属性,Kafka生产者有3个必选属性(什么意思不做说明)
bootstrap.servers key.serializer value.serializer 其余默认属性: acks buffer.memory compression.type retries batch.size linger.ms client.id max.in.flight.requests.oer.connection timeout.ms,request.timeout.ms和metadata.fetch.timeout.ms max.block.ms max.request.size resive.buffer.bytes和send.buffer.bytes
下面代码片断演示了如何建立一个新生产者,这里只指定了必要的属性,其余使用默认设置
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092");
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);
复制代码
上述代码片断主要意思是 先新建一个Properties对象 由于咱们打算把键和值定义成字符串类型,因此使用内置的StringSerializer 在这里咱们建立一个新的生产者对象,并为键和值设置了恰当的类型,而后把Properties对象传给他
实例化生产者对象后,接下来就能够开始发送消息了.发送消息主要有以3种方式,
发送并忘记(fire-and-forger): 咱们把消息发送给服务器,但并不关心他是否正常到达.大多数状况下消息会正常到达,由于Kafaka是高可用的,并且生产者会自动尝试重发,不过使用这种方式有时候也会丢失一些消息 同步发送send(): 咱们使用send()方式发送消息,他会返回一个Future对象,调用get()方法进行等待,就能够知道消息是否发送成功 异步发送send(): 咱们使用send()方式发送消息,并指定一个回调函数,服务器在返回响应时调用该函数
上面的例子使用的都是单线程,但其实生产者是可使用多线程来发送消息的刚开始的时候可使用单个消费者和单个线程.若是须要更高的吞吐量,能够在生产者数量不变的状况下增长线程数量.若是这样还不够,能够增长生产者数量
最简单消息发送方式以下所示
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry":"Precision Products","France");
try{
proucer.send(record);
} catch (Exception e){
e.printStackTrance();
}
复制代码
生产者的send() 方法将ProducerRecord 对象做为参数,因此咱们要先建立一个ProducerRecord对象.ProducerRecord有多个构造函数,这里只使用其中的一个,他须要目标主题的名字 和要发送的键和值对象 ,他们都是字符串.键和值对象的了新鲜感必须与序列化器的生产者对象相匹配
咱们使用生产者的send()方法发送ProducerRecord对象.从生产者的架构图能够看到,消息是先被放进缓冲区,而后使用单独的线程发送到服务器端.send()方法会返回一个包含RecordMetadata的Future对象,不过咱们会忽略返回值,因此没法知道消息是否发送成功,若是不关心发送结果,那么可使用这种发送方式.好比 记录Twitter消息日志,或记录不过重要的应用程序日志
最简单的同步发送消息方式以下所示
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry","Precision Products","France");
try{
producer.send(record).get();
} catch (Exception e){
e.printStackTrance();
}
复制代码
在这里 producer.send() 方法先返回一个Future对象,而后调用Future对象的get()方法等待Kafka响应,若是服务器返回错误,get()方法会抛异常.若是没有发生错误,咱们会获得一个RecordMetadata对象 能够用他来获取消息的偏移量 若是在发送数据以前或在发送过程当中发生了任何错误.好比broker返回一个不容许重发消息的异常或者已经超过了重发的次数,那么就会抛出异常.咱们只是简单的把异常信息打印出来
假设消息在应用程序和Kafka集群之间一个来回须要10ms. 若是在发送完每一个消息后都等待回应,那么发送100个消息须要1秒 但若是只发送消息而不等待响应,那么发送100个消息所须要的时间会少不少.大多数时候,咱们并不须要等待响应,尽管Kafka会把目标主题,分区信息和消息的偏移量发送回来,但对于发送端的应用程序来讲不是必需的.不过在遇到消息发送失败时,咱们须要抛出异常,记录错误日志,或把消息写入错误消息文件
为了在异步发送消息的同时可以对异常状况进行处理,生产者提供了回调支持,下面是使用回调的一个例子
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata,Exception e){
if (e != null){
e.prinStackTrace();
}
}
}
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry","Biomedical Materials","USA");
producer.send(record,new DemoProducerCallback());
复制代码
为了使用回调,须要实现org.apach.kafka.clients.producer.Callback接口的类,这个接口只有一个onCompletion方法 若是 Kafka返回一个错误 onCompletion方法会抛一个非空(non null)异常 在发送消息时传进去一个回调对象
在了解如何从Kafaka读取消息以前,咱们先先了解一下消费者和消费者群组的概念 假设咱们有一个应用程序要从一个Kafka主题读取消息并验证这些消息,而后把他们存储起来,应用程序须要建立一个消费者对象,订阅主题并开始接收消息,而后验证消息并保存结果,过了一阵子 生产者让主题写入消息的速度超过了应用程序验证数据的速度,这个时候该怎么办?若是只使用单个消费者处理消息,应用程序永远跟不上消息的生成速度,这个时候就须要像多个生产者能够向相同的主题写入消息同样,咱们也须要使用多个消费者从同一个主题读取消息,对消息进行分流.Kafka 消费者从属于消费者群组,一个群组里的消费者订阅的是同一个主题,每一个消费者接收主题一部分分区的消息,分区的一个主题消息会被不一样消费组订阅,一个消息只能被每一个消费者群组中的一个消费者接收.
群组里的消费者共同读取主题的分区,一个新的消费者加入群组时,他读取的是本来由其余消费者读取的消息.当一个消费者被关闭或者发生崩溃时,他就离开群组,本来由他读取的分区将有群组里的其余消费者来读取,在主题发生变化时,好比管理员添加新的分区,会发生分区重分配.分区的全部权从一个消费者转移到另一个消费者这样的行为称做再均衡 程序如何触发再均衡? 消费者经过向群组协调器的broker发送心跳来维持他们和群组的从属关系以及他们对分区的全部权关系,只要消费者能够以正常的的时间间隔发送心跳就被认为是活跃的说明他还在读取分区的消息.消费者会在轮询消息或者提交偏移量时发送心跳.若是消费者中止发送心跳的时间足够长,会话过时,群组协调器认为他已经死了,就会触发一次再均衡.
{
Properties props = new Properties();
props.put("bootstrap.servers","broker1:9092,broker2:9092");
props.put("group.id","CountryCounter");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);
}
复制代码
consumer.subscribe(Collection.singletonList("customerCountries"));//主题名"customerCountries"
复制代码
fetch.min.bytes fetch.max.wait.ms max.partition.fetch.bytes session.timeout.ms auto.offset.reset enable.auto.commit partition.assignment.strategy client.id max.poll.records receive.buffer.bytes/send.buffer.bytes
到这里Kafka的基础内容已经介绍完了 ,若是想深刻了解这些是远远不够的 在这里能够推荐几本书给你们 若是想对Kafka的总体有深入的认识能够读<<Kafka权威指南>>必读 其次就是 <<Kafka技术内幕>> 这两本书读完 几本就OK了 最后也能够读<<Apache Kafka 源码剖析>>不建议读 固然官网读英文文档最好了