1、基本概念java
介绍node
Kafka是一个分布式的、可分区的、可复制的消息系统。它提供了普通消息系统的功能,但具备本身独特的设计。apache
这个独特的设计是什么样的呢?api
首先让咱们看几个基本的消息系统术语:缓存
Kafka将消息以topic为单位进行概括。服务器
将向Kafka topic发布消息的程序成为producers.网络
将预订topics并消费消息的程序成为consumer.session
Kafka以集群的方式运行,能够由一个或多个服务组成,每一个服务叫作一个broker.数据结构
producers经过网络将消息发送到Kafka集群,集群向消费者提供消息,以下图所示:架构
客户端和服务端经过TCP协议通讯。Kafka提供了Java客户端,而且对多种语言都提供了支持。
Topics 和Logs
先来看一下Kafka提供的一个抽象概念:topic.
一个topic是对一组消息的概括。对每一个topic,Kafka 对它的日志进行了分区,以下图所示:
每一个分区都由一系列有序的、不可变的消息组成,这些消息被连续的追加到分区中。分区中的每一个消息都有一个连续的序列号叫作offset,用来在分区中惟一的标识这个消息。
在一个可配置的时间段内,Kafka集群保留全部发布的消息,无论这些消息有没有被消费。好比,若是消息的保存策略被设置为2天,那么在一个消息被发布的两天时间内,它都是能够被消费的。以后它将被丢弃以释放空间。Kafka的性能是和数据量无关的常量级的,因此保留太多的数据并非问题。
实际上每一个consumer惟一须要维护的数据是消息在日志中的位置,也就是offset.这个offset有consumer来维护:通常状况下随着consumer不断的读取消息,这offset的值不断增长,但其实consumer能够以任意的顺序读取消息,好比它能够将offset设置成为一个旧的值来重读以前的消息。
以上特色的结合,使Kafka consumers很是的轻量级:它们能够在不对集群和其余consumer形成影响的状况下读取消息。你可使用命令行来"tail"消息而不会对其余正在消费消息的consumer形成影响。
将日志分区能够达到如下目的:首先这使得每一个日志的数量不会太大,能够在单个服务上保存。另外每一个分区能够单独发布和消费,为并发操做topic提供了一种可能。
分布式
每一个分区在Kafka集群的若干服务中都有副本,这样这些持有副本的服务能够共同处理数据和请求,副本数量是能够配置的。副本使Kafka具有了容错能力。
每一个分区都由一个服务器做为“leader”,零或若干服务器做为“followers”,leader负责处理消息的读和写,followers则去复制leader.若是leader down了,followers中的一台则会自动成为leader。集群中的每一个服务都会同时扮演两个角色:做为它所持有的一部分分区的leader,同时做为其余分区的followers,这样集群就会据有较好的负载均衡。
Producers
Producer将消息发布到它指定的topic中,并负责决定发布到哪一个分区。一般简单的由负载均衡机制随机选择分区,但也能够经过特定的分区函数选择分区。使用的更多的是第二种。
Consumers
发布消息一般有两种模式:队列模式(queuing)和发布-订阅模式(publish-subscribe)。队列模式中,consumers能够同时从服务端读取消息,每一个消息只被其中一个consumer读到;发布-订阅模式中消息被广播到全部的consumer中。Consumers能够加入一个consumer 组,共同竞争一个topic,topic中的消息将被分发到组中的一个成员中。同一组中的consumer能够在不一样的程序中,也能够在不一样的机器上。若是全部的consumer都在一个组中,这就成为了传统的队列模式,在各consumer中实现负载均衡。若是全部的consumer都不在不一样的组中,这就成为了发布-订阅模式,全部的消息都被分发到全部的consumer中。更常见的是,每一个topic都有若干数量的consumer组,每一个组都是一个逻辑上的“订阅者”,为了容错和更好的稳定性,每一个组由若干consumer组成。这其实就是一个发布-订阅模式,只不过订阅者是个组而不是单个consumer。
由两个机器组成的集群拥有4个分区 (P0-P3) 2个consumer组. A组有两个consumerB组有4个
相比传统的消息系统,Kafka能够很好的保证有序性。
传统的队列在服务器上保存有序的消息,若是多个consumers同时从这个服务器消费消息,服务器就会以消息存储的顺序向consumer分发消息。虽然服务器按顺序发布消息,可是消息是被异步的分发到各consumer上,因此当消息到达时可能已经失去了原来的顺序,这意味着并发消费将致使顺序错乱。为了不故障,这样的消息系统一般使用“专用consumer”的概念,其实就是只容许一个消费者消费消息,固然这就意味着失去了并发性。
在这方面Kafka作的更好,经过分区的概念,Kafka能够在多个consumer组并发的状况下提供较好的有序性和负载均衡。将每一个分区分只分发给一个consumer组,这样一个分区就只被这个组的一个consumer消费,就能够顺序的消费这个分区的消息。由于有多个分区,依然能够在多个consumer组之间进行负载均衡。注意consumer组的数量不能多于分区的数量,也就是有多少分区就容许多少并发消费。
Kafka只能保证一个分区以内消息的有序性,在不一样的分区之间是不能够的,这已经能够知足大部分应用的需求。若是须要topic中全部消息的有序性,那就只能让这个topic只有一个分区,固然也就只有一个consumer组消费它。
2、环境搭建
Step 1: 下载Kafka
点击下载最新的版本并解压.
> tar -xzf kafka_2.9.2-0.8.1.1.tgz
> cd kafka_2.9.2-0.8.1.1
复制代码
Step 2: 启动服务
Kafka用到了Zookeeper,全部首先启动Zookper,下面简单的启用一个单实例的Zookkeeper服务。能够在命令的结尾加个&符号,这样就能够启动后离开控制台。
> bin/zookeeper-server-start.sh config/zookeeper.properties &
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
复制代码
如今启动Kafka:
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
复制代码
Step 3: 建立 topic
建立一个叫作“test”的topic,它只有一个分区,一个副本。
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
复制代码
能够经过list命令查看建立的topic:
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test
复制代码
除了手动建立topic,还能够配置broker让它自动建立topic.
Step 4:发送消息.
Kafka 使用一个简单的命令行producer,从文件中或者从标准输入中读取消息并发送到服务端。默认的每条命令将发送一条消息。
运行producer并在控制台中输一些消息,这些消息将被发送到服务端:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a messageThis is another message
复制代码
ctrl+c能够退出发送。
Step 5: 启动consumer
Kafka also has a command line consumer that will dump out messages to standard output.
Kafka也有一个命令行consumer能够读取消息并输出到标准输出:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message
复制代码
你在一个终端中运行consumer命令行,另外一个终端中运行producer命令行,就能够在一个终端输入消息,另外一个终端读取消息。
这两个命令都有本身的可选参数,能够在运行的时候不加任何参数能够看到帮助信息。
Step 6: 搭建一个多个broker的集群
刚才只是启动了单个broker,如今启动有3个broker组成的集群,这些broker节点也都是在本机上的:
首先为每一个节点编写配置文件:
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
复制代码
在拷贝出的新文件中添加如下参数:
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
复制代码
config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
复制代码
broker.id在集群中惟一的标注一个节点,由于在同一个机器上,因此必须制定不一样的端口和日志文件,避免数据被覆盖。
We already have Zookeeper and our single node started, so we just need to start the two new nodes:
刚才已经启动可Zookeeper和一个节点,如今启动另外两个节点:
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...
复制代码
建立一个拥有3个副本的topic:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
复制代码
如今咱们搭建了一个集群,怎么知道每一个节点的信息呢?运行“"describe topics”命令就能够了:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
复制代码
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
复制代码
下面解释一下这些输出。第一行是对全部分区的一个描述,而后每一个分区都会对应一行,由于咱们只有一个分区因此下面就只加了一行。
leader:负责处理消息的读和写,leader是从全部节点中随机选择的.
replicas:列出了全部的副本节点,无论节点是否在服务中.
isr:是正在服务中的节点.
在咱们的例子中,节点1是做为leader运行。
向topic发送消息:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
复制代码
...
my test message 1my test message 2^C
复制代码
消费这些消息:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
测试一下容错能力.Broker 1做为leader运行,如今咱们kill掉它:
> ps | grep server-1.properties7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java...
> kill -9 7564
复制代码
另一个节点被选作了leader,node 1 再也不出如今 in-sync 副本列表中:
> bin/kafka-topics.sh --describe --zookeeper localhost:218192 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
复制代码
虽然最初负责续写消息的leader down掉了,但以前的消息仍是能够消费的:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
复制代码
看来Kafka的容错机制仍是不错的。
3、搭建Kafka开发环境
咱们搭建了kafka的服务器,并可使用Kafka的命令行工具建立topic,发送和接收消息。下面咱们来搭建kafka的开发环境。
添加依赖
搭建开发环境须要引入kafka的jar包,一种方式是将Kafka安装包中lib下的jar包加入到项目的classpath中,这种比较简单了。不过咱们使用另外一种更加流行的方式:使用maven管理jar包依赖。
建立好maven项目后,在pom.xml中添加如下依赖:
org.apache.kafka
kafka_2.10
0.8.0
复制代码
添加依赖后你会发现有两个jar包的依赖找不到。不要紧我都帮你想好了,点击这里下载这两个jar包,解压后你有两种选择,第一种是使用mvn的install命令将jar包安装到本地仓库,另外一种是直接将解压后的文件夹拷贝到mvn本地仓库的com文件夹下,好比个人本地仓库是d:\mvn,完成后个人目录结构是这样的:
配置程序
首先是一个充当配置文件做用的接口,配置了Kafka的各类链接参数:
package com.sohu.kafkademon;
public interface KafkaProperties
{
final static String zkConnect = "10.22.10.139:2181";
final static String groupId = "group1";
final static String topic = "topic1";
final static String kafkaServerURL = "10.22.10.139";
final static int kafkaServerPort = 9092;
final static int kafkaProducerBufferSize = 64 * 1024;
final static int connectionTimeOut = 20000;
final static int reconnectInterval = 10000;
final static String topic2 = "topic2";
final static String topic3 = "topic3";
final static String clientId = "SimpleConsumerDemoClient";
}
复制代码
producer
package com.sohu.kafkademon;
import java.util.Properties;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* @author leicui bourne_cui@163.com
*/
public class KafkaProducer extends Thread
{
private final kafka.javaapi.producer.Producer producer;
private final String topic;
private final Properties props = new Properties();
public KafkaProducer(String topic)
{
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "10.22.10.139:9092");
producer = new kafka.javaapi.producer.Producer(new ProducerConfig(props));
this.topic = topic;
}
public void run() {
int messageNo = 1;
while (true)
{
String messageStr = new String("Message_" + messageNo);
System.out.println("Send:" + messageStr);
producer.send(new KeyedMessage(topic, messageStr));
messageNo++;
try {
sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
复制代码
consumer
package com.sohu.kafkademon;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
/**
* @author leicui bourne_cui@163.com
*/
public class KafkaConsumer extends Thread
{
private final ConsumerConnector consumer;
private final String topic;
public KafkaConsumer(String topic)
{
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig());
this.topic = topic;
}
private static ConsumerConfig createConsumerConfig()
{
Properties props = new Properties();
props.put("zookeeper.connect", KafkaProperties.zkConnect);
props.put("group.id", KafkaProperties.groupId);
props.put("zookeeper.session.timeout.ms", "40000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
@Override
public void run() {
Map topicCountMap = new HashMap();
topicCountMap.put(topic, new Integer(1));
Map>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream stream = consumerMap.get(topic).get(0);
ConsumerIterator it = stream.iterator();
while (it.hasNext()) {
System.out.println("receive:" + new String(it.next().message()));
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
复制代码
简单的发送接收
运行下面这个程序,就能够进行简单的发送接收消息了:
package com.sohu.kafkademon;
/**
* @author leicui bourne_cui@163.com
*/
public class KafkaConsumerProducerDemo
{
public static void main(String[] args)
{
KafkaProducer producerThread = new KafkaProducer(KafkaProperties.topic);
producerThread.start();
KafkaConsumer consumerThread = new KafkaConsumer(KafkaProperties.topic);
consumerThread.start();
}
}
复制代码
高级别的consumer
下面是比较负载的发送接收的程序:
package com.sohu.kafkademon;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
/**
* @author leicui bourne_cui@163.com
*/
public class KafkaConsumer extends Thread
{
private final ConsumerConnector consumer;
private final String topic;
public KafkaConsumer(String topic)
{
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig());
this.topic = topic;
}
private static ConsumerConfig createConsumerConfig()
{
Properties props = new Properties();
props.put("zookeeper.connect", KafkaProperties.zkConnect);
props.put("group.id", KafkaProperties.groupId);
props.put("zookeeper.session.timeout.ms", "40000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
@Override
public void run() {
Map topicCountMap = new HashMap();
topicCountMap.put(topic, new Integer(1));
Map>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream stream = consumerMap.get(topic).get(0);
ConsumerIterator it = stream.iterator();
while (it.hasNext()) {
System.out.println("receive:" + new String(it.next().message()));
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
4、数据持久化
不要畏惧文件系统!
Kafka大量依赖文件系统去存储和缓存消息。对于硬盘有个传统的观念是硬盘老是很慢,这使不少人怀疑基于文件系统的架构可否提供优异的性能。实际上硬盘的快慢彻底取决于使用它的方式。设计良好的硬盘架构能够和内存同样快。
在6块7200转的SATA RAID-5磁盘阵列的线性写速度差很少是600MB/s,可是随即写的速度倒是100k/s,差了差很少6000倍。现代的操做系统都对次作了大量的优化,使用了 read-ahead 和 write-behind的技巧,读取的时候成块的预读取数据,写的时候将各类微小琐碎的逻辑写入组织合并成一次较大的物理写入。对此的深刻讨论能够查看这里,它们发现线性的访问磁盘,不少时候比随机的内存访问快得多。
为了提升性能,现代操做系统每每使用内存做为磁盘的缓存,现代操做系统乐于把全部空闲内存用做磁盘缓存,虽然这可能在缓存回收和从新分配时牺牲一些性能。全部的磁盘读写操做都会通过这个缓存,这不太可能被绕开除非直接使用I/O。因此虽然每一个程序都在本身的线程里只缓存了一份数据,但在操做系统的缓存里还有一份,这等于存了两份数据。
另外再来讨论一下JVM,如下两个事实是众所周知的:
•Java对象占用空间是很是大的,差很少是要存储的数据的两倍甚至更高。
•随着堆中数据量的增长,垃圾回收回变的愈来愈困难。
基于以上分析,若是把数据缓存在内存里,由于须要存储两份,不得不使用两倍的内存空间,Kafka基于JVM,又不得不将空间再次加倍,再加上要避免GC带来的性能影响,在一个32G内存的机器上,不得不使用到28-30G的内存空间。而且当系统重启的时候,又必需要将数据刷到内存中( 10GB 内存差很少要用10分钟),就算使用冷刷新(不是一次性刷进内存,而是在使用数据的时候没有就刷到内存)也会致使最初的时候新能很是慢。可是使用文件系统,即便系统重启了,也不须要刷新数据。使用文件系统也简化了维护数据一致性的逻辑。
因此与传统的将数据缓存在内存中而后刷到硬盘的设计不一样,Kafka直接将数据写到了文件系统的日志中。
常量时间的操做效率
在大多数的消息系统中,数据持久化的机制每每是为每一个cosumer提供一个B树或者其余的随机读写的数据结构。B树固然是很棒的,可是也带了一些代价:好比B树的复杂度是O(log N),O(log N)一般被认为就是常量复杂度了,但对于硬盘操做来讲并不是如此。磁盘进行一次搜索须要10ms,每一个硬盘在同一时间只能进行一次搜索,这样并发处理就成了问题。虽然存储系统使用缓存进行了大量优化,可是对于树结构的性能的观察结果却代表,它的性能每每随着数据的增加而线性降低,数据增加一倍,速度就会下降一倍。
直观的讲,对于主要用于日志处理的消息系统,数据的持久化能够简单的经过将数据追加到文件中实现,读的时候从文件中读就行了。这样作的好处是读和写都是 O(1) 的,而且读操做不会阻塞写操做和其余操做。这样带来的性能优点是很明显的,由于性能和数据的大小没有关系了。
既然可使用几乎没有容量限制(相对于内存来讲)的硬盘空间创建消息系统,就能够在没有性能损失的状况下提供一些通常消息系统不具有的特性。好比,通常的消息系统都是在消息被消费后当即删除,Kafka却能够将消息保存一段时间(好比一星期),这给consumer提供了很好的机动性和灵活性,这点在从此的文章中会有详述。
5、消息传输的事务定义
以前讨论了consumer和producer是怎么工做的,如今来讨论一下数据传输方面。数据传输的事务定义一般有如下三种级别:
最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输。
最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
精确的一次(Exactly once): 不会漏传输也不会重复传输,每一个消息都传输被一次并且仅仅被传输一次,这是你们所指望的。
大多数消息系统声称能够作到“精确的一次”,可是仔细阅读它们的的文档能够看到里面存在误导,好比没有说明当consumer或producer失败时怎么样,或者当有多个consumer并行时怎么样,或写入硬盘的数据丢失时又会怎么样。kafka的作法要更先进一些。当发布消息时,Kafka有一个“committed”的概念,一旦消息被提交了,只要消息被写入的分区的所在的副本broker是活动的,数据就不会丢失。关于副本的活动的概念,下节文档会讨论。如今假设broker是不会down的。
若是producer发布消息时发生了网络错误,但又不肯定实在提交以前发生的仍是提交以后发生的,这种状况虽然不常见,可是必须考虑进去,如今Kafka版本尚未解决这个问题,未来的版本正在努力尝试解决。
并非全部的状况都须要“精确的一次”这样高的级别,Kafka容许producer灵活的指定级别。好比producer能够指定必须等待消息被提交的通知,或者彻底的异步发送消息而不等待任何通知,或者仅仅等待leader声明它拿到了消息(followers没有必要)。
如今从consumer的方面考虑这个问题,全部的副本都有相同的日志文件和相同的offset,consumer维护本身消费的消息的offset,若是consumer不会崩溃固然能够在内存中保存这个值,固然谁也不能保证这点。若是consumer崩溃了,会有另一个consumer接着消费消息,它须要从一个合适的offset继续处理。这种状况下能够有如下选择:
consumer能够先读取消息,而后将offset写入日志文件中,而后再处理消息。这存在一种可能就是在存储offset后还没处理消息就crash了,新的consumer继续从这个offset处理,那么就会有些消息永远不会被处理,这就是上面说的“最多一次”。
consumer能够先读取消息,处理消息,最后记录offset,固然若是在记录offset以前就crash了,新的consumer会重复的消费一些消息,这就是上面说的“最少一次”。
“精确一次”能够经过将提交分为两个阶段来解决:保存了offset后提交一次,消息处理成功以后再提交一次。可是还有个更简单的作法:将消息的offset和消息被处理后的结果保存在一块儿。好比用Hadoop ETL处理消息时,将处理后的结果和offset同时保存在HDFS中,这样就能保证消息和offser同时被处理了。
完整的项目源码来源 欢迎你们一块儿学习研究相关技术,源码获取请加求求:2670716182