什么业务场景,这个业务场景有个什么技术挑战,若是不用MQ可能会很麻烦,可是你如今用了MQ以后带给了你不少的好处。消息队列的常见使用场景,其实场景有不少,可是比较核心的有3个:解耦、异步、削峰。linux
A系统发送个数据到BCD三个系统,接口调用发送,那若是E系统也要这个数据呢?那若是C系统如今不须要了呢?如今A系统又要发送第二种数据了呢?并且A系统要时时刻刻考虑BCDE四个系统若是挂了咋办?要不要重发?我要不要把消息存起来?算法
你须要去考虑一下你负责的系统中是否有相似的场景,就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。可是其实这个调用是不须要直接同步调用接口的,若是用MQ给他异步化解耦,也是能够的,你就须要去考虑在你的项目里,是否是能够运用这个MQ去进行系统的解耦。数据库
A系统接收一个请求,须要在本身本地写库,还须要在BCD三个系统写库,本身本地写库要30ms,BCD三个系统分别写库要300ms、450ms、200ms。最终请求总延时是30 + 300 + 450 + 200 = 980ms,接近1s,异步后,BCD三个系统分别写库的时间,A系统就再也不考虑了。api
天天0点到16点,A系统风平浪静,每秒并发请求数量就100个。结果每次一到16点~23点,每秒并发请求数量忽然会暴增到1万条。可是系统最大的处理能力就只能是每秒钟处理1000个请求啊。怎么办?须要咱们进行流量的削峰,让系统能够平缓的处理突增的请求。数组
优势上面已经说了,就是在特殊场景下有其对应的好处,解耦、异步、削峰。缓存
缺点呢? 网络
系统引入的外部依赖越多,越容易挂掉,原本你就是A系统调用BCD三个系统的接口就行了,ABCD四个系统好好的,没啥问题,你偏加个MQ进来,万一MQ挂了怎么办?MQ挂了,整套系统崩溃了,业务也就停顿了。session
硬生生加个MQ进来,怎么保证消息没有重复消费?怎么处理消息丢失的状况?怎么保证消息传递的顺序性? 架构
A系统处理完了直接返回成功了,人都觉得你这个请求就成功了;可是问题是,要是BCD三个系统那里,BD两个系统写库成功了,结果C系统写库失败了,你这数据就不一致了。并发
因此消息队列实际是一种很是复杂的架构,你引入它有不少好处,可是也得针对它带来的坏处作各类额外的技术方案和架构来规避掉。
消息发送端应用的消息重复发送,有如下几种状况。
l 消息发送端发送消息给消息中间件,消息中间件收到消息并成功存储,而这时消息中间件出现了问题,致使应用端没有收到消息发送成功的返回于是进行重试产生了重复。
l 消息中间件由于负载高响应变慢,成功把消息存储到消息存储中后,返回“成功”这个结果时超时。
l 消息中间件将消息成功写入消息存储,在返回结果时网络出现问题,致使应用发送端重试,而重试时网络恢复,由此致使重复。
能够看到,经过消息发送端产生消息重复的主要缘由是消息成功进入消息存储后,由于各类缘由使得消息发送端没有收到“成功”的返回结果,而且又有重试机制,于是致使重复。
消息到达了消息存储,由消息中间件进行向外的投递时产生重复,有如下几种状况。
l 消息被投递到消息接收者应用进行处理,处理完毕后应用出问题了,消息中间件不知道消息处理结果,会再次投递。
l 消息被投递到消息接收者应用进行处理,处理完毕后网络出现问题了,消息中间件没有收到消息处理结果,会再次投递。
l 消息被投递到消息接收者应用进行处理,处理时间比较长,消息中间件由于消息超时会再次投递。
l 消息被投递到消息接收者应用进行处理,处理完毕后消息中间件出问题了,没能收到消息结果并处理,会再次投递
l 消息被投递到消息接收者应用进行处理,处理完毕后消息中间件收到结果可是遇到消息存储故障,没能更新投递状态,会再次投递。
能够看到,在投递过程当中产生的消息重复接收主要是由于消息接收者成功处理完消息后,消息中间件不能及时更新投递状态形成的。
那么有什么办法能够解决呢?主要是要求消息接收者来处理这种重复的状况,也就是要求消息接收者的消息处理是幂等操做。
对于消息接收端的状况,幂等的含义是采用一样的输入屡次调用处理函数,获得一样的结果。例如,一个SQL操做
update stat_table set count= 10 where id =1
这个操做屡次执行,id等于1的记录中的 count字段的值都为10,这个操做就是幂等的,咱们不用担忧这个操做被重复。
再来看另一个SQL操做
update stat_table set count= count +1 where id= 1;
这样的SQL操做就不是幂等的,一旦重复,结果就会产生变化。
所以应对消息重复的办法是,使消息接收端的处理是一个幂等操做。这样的作法下降了消息中间件的总体复杂性,不过也给使用消息中间件的消息接收端应用带来了必定的限制和门槛。
多版本并发控制,乐观锁的一种实现,在生产者发送消息时进行数据更新时须要带上数据的版本号,消费者去更新时须要去比较持有数据的版本号,版本号不一致的操做没法成功。例如博客点赞次数自动+1的接口:
public boolean addCount(Long id, Long version);
update blogTable set count= count+1,version=version+1 where id=321 and version=123
每个version只有一次执行成功的机会,一旦失败了生产者必须从新获取数据的最新版本号再次发起更新。
利用数据库表单的特性来实现幂等,经常使用的一个思路是在表上构建惟一性索引,保证某一类数据一旦执行完毕,后续一样的请求再也不重复处理了(利用一张日志表来记录已经处理成功的消息的ID,若是新到的消息ID已经在日志表中,那么就再也不处理这条消息。)
以电商平台为例子,电商平台上的订单id就是最适合的token。当用户下单时,会经历多个环节,好比生成订单,减库存,减优惠券等等。每个环节执行时都先检测一下该订单id是否已经执行过这一步骤,对未执行的请求,执行操做并缓存结果,而对已经执行过的id,则直接返回以前的执行结果,不作任何操做。这样能够在最大程度上避免操做的重复执行问题,缓存起来的执行结果也能用于事务的控制等。
要保证消息的可靠性,除了消息的持久化,还包括两个方面,一是生产者发送的消息能够被ActiveMQ收到,二是消费者收到了ActiveMQ发送的消息。
非持久化又不在事务中的消息,可能会有消息的丢失。为保证消息能够被ActiveMQ收到,咱们应该采用事务消息或持久化消息。
对消息的确认有4种机制
一、 AUTO_ACKNOWLEDGE = 1 自动确认
二、 CLIENT_ACKNOWLEDGE = 2 客户端手动确认
三、 DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
四、 SESSION_TRANSACTED = 0 事务提交并确认
ACK_MODE描述了Consumer与broker确认消息的方式(时机),好比当消息被Consumer接收以后,Consumer将在什么时候确认消息。因此ack_mode描述的不是producer于broker之间的关系,而是customer于broker之间的关系。
对于broker而言,只有接收到ACK指令,才会认为消息被正确的接收或者处理成功了,经过ACK,能够在consumer与Broker之间创建一种简单的“担保”机制.
自动确认
“同步”(receive)方法返回message给消息时会当即确认。
在"异步"(messageListener)方式中,将会首先调用listener.onMessage(message),若是onMessage方法正常结束,消息将会正常确认。若是onMessage方法异常,将致使消费者要求ActiveMQ重发消息。
客户端手动确认,这就意味着AcitveMQ将不会“自做主张”的为你ACK任何消息,开发者须要本身择机确认。
咱们能够在当前消息处理成功以后,当即调用message.acknowledge()方法来"逐个"确认消息,这样能够尽量的减小因网络故障而致使消息重发的个数;固然也能够处理多条消息以后,间歇性的调用acknowledge方法来一次确认多条消息,减小ack的次数来提高consumer的效率,不过须要自行权衡。
相似于AUTO_ACK确认机制,为自动批量确认而生,并且具备“延迟”确认的特色,ActiveMQ会根据内部算法,在收到必定数量的消息自动进行确认。在此模式下,可能会出现重复消息,何时?当consumer故障重启后,那些还没有ACK的消息会从新发送过来。
当session使用事务时,就是使用此模式。当决定事务中的消息能够确认时,必须调用session.commit()方法,commit方法将会致使当前session的事务中全部消息当即被确认。在事务开始以后的任什么时候机调用rollback(),意味着当前事务的结束,事务中全部的消息都将被重发。固然在commit以前抛出异常,也会致使事务的rollback。
生产者将数据发送到RabbitMQ的时候,可能数据就在半路给搞丢了,由于网络啥的问题,都有可能。此时能够选择用RabbitMQ提供的事务功能,就是生产者发送数据以前开启RabbitMQ事务(channel.txSelect),而后发送消息,若是消息没有成功被RabbitMQ接收到,那么生产者会收到异常报错,此时就能够回滚事务(channel.txRollback),而后重试发送消息;若是收到了消息,那么能够提交事务(channel.txCommit)。可是问题是,RabbitMQ事务机制一搞,基本上吞吐量会下来,由于太耗性能。
因此通常来讲,若是要确保RabbitMQ的消息别丢,能够开启confirm模式,在生产者那里设置开启confirm模式以后,你每次写的消息都会分配一个惟一的id,而后若是写入了RabbitMQ中,RabbitMQ会给你回传一个ack消息,告诉你说这个消息ok了。若是RabbitMQ没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你能够重试。并且你能够结合这个机制本身在内存里维护每一个消息id的状态,若是超过必定时间还没接收到这个消息的回调,那么你能够重发。
事务机制和cnofirm机制最大的不一样在于,事务机制是同步的,你提交一个事务以后会阻塞在那儿,可是confirm机制是异步的,你发送个消息以后就能够发送下一个消息,而后那个消息RabbitMQ接收了以后会异步回调你一个接口通知你这个消息接收到了。
因此通常在生产者这块避免数据丢失,都是用confirm机制的。
就是RabbitMQ本身弄丢了数据,这个你必须开启RabbitMQ的持久化,就是消息写入以后会持久化到磁盘,哪怕是RabbitMQ本身挂了,恢复以后会自动读取以前存储的数据,通常数据不会丢。除非极其罕见的是,RabbitMQ还没持久化,本身就挂了,可能致使少许数据会丢失的,可是这个几率较小。
设置持久化有两个步骤,第一个是建立queue和交换器的时候将其设置为持久化的,这样就能够保证RabbitMQ持久化相关的元数据,可是不会持久化queue里的数据;第二个是发送消息的时候将消息的deliveryMode设置为2,就是将消息设置为持久化的,此时RabbitMQ就会将消息持久化到磁盘上去。必需要同时设置这两个持久化才行,RabbitMQ哪怕是挂了,再次重启,也会从磁盘上重启恢复queue,恢复这个queue里的数据。
并且持久化能够跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘以后,才会通知生产者ack了,因此哪怕是在持久化到磁盘以前,RabbitMQ挂了,数据丢了,生产者收不到ack,你也是能够本身重发的。
哪怕是你给RabbitMQ开启了持久化机制,也有一种可能,就是这个消息写到了RabbitMQ中,可是还没来得及持久化到磁盘上,结果不巧,此时RabbitMQ挂了,就会致使内存里的一点点数据会丢失。
RabbitMQ若是丢失了数据,主要是由于你消费的时候,刚消费到,还没处理,结果进程挂了,好比重启了,那么就尴尬了,RabbitMQ认为你都消费了,这数据就丢了。
这个时候得用RabbitMQ提供的ack机制,简单来讲,就是你关闭RabbitMQ自动ack,能够经过一个api来调用就行,而后每次你本身代码里确保处理完的时候,再程序里ack一把。这样的话,若是你还没处理完,不就没有ack?那RabbitMQ就认为你还没处理完,这个时候RabbitMQ会把这个消费分配给别的consumer去处理,消息是不会丢的。
惟一可能致使消费者弄丢数据的状况,就是说,你那个消费到了这个消息,而后消费者那边自动提交了offset,让kafka觉得你已经消费好了这个消息,其实你刚准备处理这个消息,你还没处理,你本身就挂了,此时这条消息就丢咯。
你们都知道kafka会自动提交offset,那么只要关闭自动提交offset,在处理完以后本身手动提交offset,就能够保证数据不会丢。可是此时确实仍是会重复消费,好比你刚处理完,还没提交offset,结果本身挂了,此时确定会重复消费一次,本身保证幂等性就行了。
生产环境碰到的一个问题,就是说咱们的kafka消费者消费到了数据以后是写到一个内存的queue里先缓冲一下,结果有的时候,你刚把消息写入内存queue,而后消费者会自动提交offset。
而后此时咱们重启了系统,就会致使内存queue里还没来得及处理的数据就丢失了
这块比较常见的一个场景,就是kafka某个broker宕机,而后从新选举partiton的leader时。你们想一想,要是此时其余的follower恰好还有些数据没有同步,结果此时leader挂了,而后选举某个follower成leader以后,他不就少了一些数据?这就丢了一些数据啊。
因此此时通常是要求起码设置以下4个参数:
给这个topic设置replication.factor参数:这个值必须大于1,要求每一个partition必须有至少2个副本。
在kafka服务端设置min.insync.replicas参数:这个值必须大于1,这个是要求一个leader至少感知到有至少一个follower还跟本身保持联系,没掉队,这样才能确保leader挂了还有一个follower吧。
在producer端设置acks=all:这个是要求每条数据,必须是写入全部replica以后,才能认为是写成功了。
在producer端设置retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。
若是按照上述的思路设置了ack=all,必定不会丢,要求是,你的leader接收到消息,全部的follower都同步到了消息以后,才认为本次写成功了。若是没知足这个条件,生产者会自动不断的重试,重试无限次。
从根本上说,异步消息是不该该有顺序依赖的。在MQ上估计是无法解决。要实现严格的顺序消息,简单且可行的办法就是:保证生产者 - MQServer - 消费者是一对一对一的关系。
1、经过高级特性consumer独有消费者(exclusive consumer)
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);
当在接收信息的时候,有多个独占消费者的时候,只有一个独占消费者能够接收到消息。
独占消息就是在有多个消费者同时消费一个queue时,能够保证只有一个消费者能够消费消息,这样虽然保证了消息的顺序问题,不过也带来了一个问题,就是这个queue的全部消息将只会在这一个主消费者上消费,其余消费者将闲置,达不到负载均衡分配,而实际业务咱们可能更多的是这样的场景,好比一个订单会发出一组顺序消息,咱们只要求这一组消息是顺序消费的,而订单与订单之间又是能够并行消费的,不须要顺序,由于顺序也没有任何意义,有没有办法作到呢?能够利用activemq的另外一个高级特性之messageGroup
2、利用Activemq的高级特性:messageGroups
Message Groups特性是一种负载均衡的机制。在一个消息被分发到consumer以前,broker首先检查消息JMSXGroupID属性。若是存在,那么broker会检查是否有某个consumer拥有这个message group。若是没有,那么broker会选择一个consumer,并将它关联到这个message group。此后,这个consumer会接收这个message group的全部消息,直到:Consumer被关闭。Message group被关闭,经过发送一个消息,并设置这个消息的JMSXGroupSeq为-1
bytesMessage.setStringProperty("JMSXGroupID", "constact-20100000002");
bytesMessage.setIntProperty("JMSXGroupSeq", -1);
如上图所示,同一个queue中,拥有相同JMSXGroupID的消息将发往同一个消费者,解决顺序问题,不一样分组的消息又能被其余消费者并行消费,解决负载均衡的问题。
若是有顺序依赖的消息,要保证消息有一个hashKey,相似于数据库表分区的的分区key列。保证对同一个key的消息发送到相同的队列。A用户产生的消息(包括建立消息和删除消息)都按A的hashKey分发到同一个队列。只须要把强相关的两条消息基于相同的路由就好了,也就是说通过m1和m2的在路由表里的路由是同样的,那天然m1会优先于m2去投递。并且一个queue只对应一个consumer。
一个topic,一个partition,一个consumer,内部单线程消费
rabbitmq,rabbitmq是能够设置过时时间的,就是TTL,若是消息在queue中积压超过必定的时间,而又没有设置死信队列机制,就会被rabbitmq给清理掉,这个数据就没了。
ActiveMQ则经过更改配置,支持消息的定时发送。
发生了线上故障,几千万条数据在MQ里积压好久。是修复consumer的问题,让他恢复消费速度,而后等待几个小时消费完毕?这是个解决方案。不过有时候咱们还会进行临时紧急扩容。
一个消费者一秒是1000条,一秒3个消费者是3000条,一分钟是18万条。1000多万条,因此若是积压了几百万到上千万的数据,即便消费者恢复了,也须要大概1小时的时间才能恢复过来。
通常这个时候,只能操做临时紧急扩容了,具体操做步骤和思路以下:
先修复consumer的问题,确保其恢复消费速度,而后将现有cnosumer都停掉。
新建一个topic,partition是原来的10倍,临时创建好原先10倍或者20倍的queue数量。而后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费以后不作耗时的处理,直接均匀轮询写入临时创建好的10倍数量的queue。
接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据。
这种作法至关因而临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据。
等快速消费完积压数据以后,再恢复原先部署架构,从新用原先的consumer机器来消费消息。
Kafka中每一个Topic都包含一个或多个Partition,不一样Partition可位于不一样节点。同时Partition在物理上对应一个本地文件夹,每一个Partition包含一个或多个Segment,每一个Segment包含一个数据文件和一个与之对应的索引文件。在逻辑上,能够把一个Partition看成一个很是长的数组,可经过这个“数组”的索引(offset)去访问其数据。
一方面,因为不一样Partition可位于不一样机器,所以能够充分利用集群优点,实现机器间的并行处理。另外一方面,因为Partition在物理上对应一个文件夹,即便多个Partition位于同一个节点,也可经过配置让同一节点上的不一样Partition置于不一样的disk drive上,从而实现磁盘间的并行处理,充分发挥多磁盘的优点。
利用多磁盘的具体方法是,将不一样磁盘mount到不一样目录,而后在server.properties中,将log.dirs设置为多目录(用逗号分隔)。Kafka会自动将全部Partition尽量均匀分配到不一样目录也即不一样目录(也即不一样disk)上。
Partition是最小并发粒度,Partition个数决定了可能的最大并行度。。
- RDBMS的读写分离即为典型的Master-Slave方案
- 同步复制可保证强一致性但会影响可用性
- 异步复制可提供高可用性但会下降一致性
- 主要用于去中心化的分布式系统中。
- N表明总副本数,W表明每次写操做要保证的最少写成功的副本数,R表明每次读至少要读取的副本数
- 当W+R>N时,可保证每次读取的数据至少有一个副本拥有最新的数据
- 多个写操做的顺序难以保证,可能致使多副本间的写操做顺序不一致。Dynamo经过向量时钟保证最终一致性
- Google的Chubby,Zookeeper的原子广播协议(Zab),RAFT等
Kafka的数据复制是以Partition为单位的。而多个备份间的数据复制,经过Follower向Leader拉取数据完成。从一这点来说,Kafka的数据复制方案接近于上文所讲的Master-Slave方案。不一样的是,Kafka既不是彻底的同步复制,也不是彻底的异步复制,而是基于ISR的动态复制方案。
ISR,也即In-sync Replica。每一个Partition的Leader都会维护这样一个列表,该列表中,包含了全部与之同步的Replica(包含Leader本身)。每次数据写入时,只有ISR中的全部Replica都复制完,Leader才会将其置为Commit,它才能被Consumer所消费。
这种方案,与同步复制很是接近。但不一样的是,这个ISR是由Leader动态维护的。若是Follower不能紧“跟上”Leader,它将被Leader从ISR中移除,待它又从新“跟上”Leader后,会被Leader再次加加ISR中。每次改变ISR后,Leader都会将最新的ISR持久化到Zookeeper中。
因为Leader可移除不能及时与之同步的Follower,故与同步复制相比可避免最慢的Follower拖慢总体速度,也即ISR提升了系统可用性。
ISR中的全部Follower都包含了全部Commit过的消息,而只有Commit过的消息才会被Consumer消费,故从Consumer的角度而言,ISR中的全部Replica都始终处于同步状态,从而与异步复制方案相比提升了数据一致性。
ISR可动态调整,极限状况下,能够只包含Leader,极大提升了可容忍的宕机的Follower的数量。与Majority Quorum方案相比,容忍相同个数的节点失败,所要求的总节点数少了近一半。
Kafka的整个设计中,Partition至关于一个很是长的数组,而Broker接收到的全部消息顺序写入这个大数组中。同时Consumer经过Offset顺序消费这些数据,而且不删除已经消费的数据,从而避免了随机写磁盘的过程。
因为磁盘有限,不可能保存全部数据,实际上做为消息系统Kafka也不必保存全部数据,须要删除旧的数据。而这个删除过程,并不是经过使用“读-写”模式去修改文件,而是将Partition分为多个Segment,每一个Segment对应一个物理文件,经过删除整个文件的方式去删除Partition内的数据。这种方式清除旧数据的方式,也避免了对文件的随机写操做。
在存储机制上,使用了Log Structured Merge Trees(LSM) 。
注:Log Structured Merge Trees(LSM),谷歌 “BigTable” 的论文,中提出,LSM是当前被用在许多产品的文件结构策略:HBase, Cassandra, LevelDB, SQLite,Kafka。LSM被设计来提供比传统的B+树或者ISAM更好的写操做吞吐量,经过消去随机的本地更新操做来达到这个目标。这个问题的本质仍是磁盘随机操做慢,顺序读写快。这二种操做存在巨大的差距,不管是磁盘仍是SSD,并且快至少三个数量级。
使用Page Cache的好处以下
- I/O Scheduler会将连续的小块写组装成大块的物理写从而提升性能
- I/O Scheduler会尝试将一些写操做从新按顺序排好,从而减小磁盘头的移动时间
- 充分利用全部空闲内存(非JVM内存)。若是使用应用层Cache(即JVM堆内存),会增长GC负担
- 读操做可直接在Page Cache内进行。若是消费和生产速度至关,甚至不须要经过物理磁盘(直接经过Page Cache)交换数据
- 若是进程重启,JVM内的Cache会失效,但Page Cache仍然可用
Broker收到数据后,写磁盘时只是将数据写入Page Cache,并不保证数据必定彻底写入磁盘。从这一点看,可能会形成机器宕机时,Page Cache内的数据未写入磁盘从而形成数据丢失。可是这种丢失只发生在机器断电等形成操做系统不工做的场景,而这种场景彻底能够由Kafka层面的Replication机制去解决。若是为了保证这种状况下数据不丢失而强制将Page Cache中的数据Flush到磁盘,反而会下降性能。也正因如此,Kafka虽然提供了flush.messages和flush.ms两个参数将Page Cache中的数据强制Flush到磁盘,可是Kafka并不建议使用。
若是数据消费速度与生产速度至关,甚至不须要经过物理磁盘交换数据,而是直接经过Page Cache交换数据。同时,Follower从Leader Fetch数据时,也可经过Page Cache完成。
注:Page Cache,又称pcache,其中文名称为页高速缓冲存储器,简称页高缓。page cache的大小为一页,一般为4K。在linux读写文件时,它用于缓存文件的逻辑内容,从而加快对磁盘上映像和数据的访问。 是Linux操做系统的一个特点。
Broker的log.dirs配置项,容许配置多个文件夹。若是机器上有多个Disk Drive,可将不一样的Disk挂载到不一样的目录,而后将这些目录都配置到log.dirs里。Kafka会尽量将不一样的Partition分配到不一样的目录,也即不一样的Disk上,从而充分利用了多Disk的优点。
Kafka中存在大量的网络数据持久化到磁盘(Producer到Broker)和磁盘文件经过网络发送(Broker到Consumer)的过程。这一过程的性能直接影响Kafka的总体吞吐量。
传统模式下的四次拷贝与四次上下文切换
以将磁盘文件经过网络发送为例。传统模式下,通常使用以下伪代码所示的方法先将文件数据读入内存,而后经过Socket将内存中的数据发送出去。
buffer = File.readSocket.send(buffer)
这一过程实际上发生了四次数据拷贝。首先经过系统调用将文件数据读入到内核态Buffer(DMA拷贝),而后应用程序将内存态Buffer数据读入到用户态Buffer(CPU拷贝),接着用户程序经过Socket发送数据时将用户态Buffer数据拷贝到内核态Buffer(CPU拷贝),最后经过DMA拷贝将数据拷贝到NIC Buffer。同时,还伴随着四次上下文切换。
而Linux 2.4+内核经过sendfile系统调用,提供了零拷贝。数据经过DMA拷贝到内核态Buffer后,直接经过DMA拷贝到NIC Buffer,无需CPU拷贝。这也是零拷贝这一说法的来源。除了减小数据拷贝外,由于整个读文件-网络发送由一个sendfile调用完成,整个过程只有两次上下文切换,所以大大提升了性能。
从具体实现来看,Kafka的数据传输经过Java NIO的FileChannel的transferTo和transferFrom方法实现零拷贝。
注: transferTo和transferFrom并不保证必定能使用零拷贝。其实是否能使用零拷贝与操做系统相关,若是操做系统提供sendfile这样的零拷贝系统调用,则这两个方法会经过这样的系统调用充分利用零拷贝的优点,不然并不能经过这两个方法自己实现零拷贝。
批处理是一种经常使用的用于提升I/O性能的方式。对Kafka而言,批处理既减小了网络传输的Overhead,又提升了写磁盘的效率。
Kafka 的send方法并不是当即将消息发送出去,而是经过batch.size和linger.ms控制实际发送频率,从而实现批量发送。
因为每次网络传输,除了传输消息自己之外,还要传输很是多的网络协议自己的一些内容(称为Overhead),因此将多条消息合并到一块儿传输,可有效减小网络传输的Overhead,进而提升了传输效率。
Kafka从0.7开始,即支持将数据压缩后再传输给Broker。除了能够将每条消息单独压缩而后传输外,Kafka还支持在批量发送时,将整个Batch的消息一块儿压缩后传输。数据压缩的一个基本原理是,重复数据越多压缩效果越好。所以将整个Batch的数据一块儿压缩能更大幅度减少数据量,从而更大程度提升网络传输效率。
Broker接收消息后,并不直接解压缩,而是直接将消息以压缩后的形式持久化到磁盘。Consumer Fetch到数据后再解压缩。所以Kafka的压缩不只减小了Producer到Broker的网络传输负载,同时也下降了Broker磁盘操做的负载,也下降了Consumer与Broker间的网络传输量,从而极大得提升了传输效率,提升了吞吐量。
Kafka消息的Key和Payload(或者说Value)的类型可自定义,只需同时提供相应的序列化器和反序列化器便可。所以用户能够经过使用快速且紧凑的序列化-反序列化方式(如Avro,Protocal Buffer)来减小实际网络传输和磁盘存储的数据规模,从而提升吞吐率。这里要注意,若是使用的序列化方法太慢,即便压缩比很是高,最终的效率也不必定高。