Kafka 入门 and kafka+logstash 实战应用

1、基础理论nginx


这块是整个kafka的核心不管你是先操做在来看仍是先看在操做都须要多看几遍。

算法

首先来了解一下Kafka所使用的基本术语apache

Topic
Kafka将消息种子(Feed)分门别类 每一类的消息称之为话题(Topic).
Producer
发布消息的对象称之为话题生产者(Kafka topic producer)
Consumer
订阅消息并处理发布的消息的种子的对象称之为话题消费者(consumers)
Broker
已发布的消息保存在一组服务器中称之为Kafka集群。集群中的每个服务器都是一个代理(Broker). 消费者能够订阅一个或多个话题并从Broker拉数据从而消费这些已发布的消息。

让咱们站的高一点从高的角度来看Kafka集群的业务处理就像这样子

wKiom1fhP-vRkftcAAAfOoQwTtk506.png编程

Client和Server之间的通信是经过一条简单、高性能而且和开发语言无关的TCP协议。除了Java Client外还有很是多的其它编程语言的Client。
bootstrap


话题和日志  (Topic和Log)

让咱们更深刻的了解Kafka中的Topic。bash

Topic是发布的消息的类别或者种子Feed名。对于每个TopicKafka集群维护这一个分区的log就像下图中的示例服务器

wKioL1fhQHSQG62WAAA-QB83hq8979.png

每个分区都是一个顺序的、不可变的消息队列 而且能够持续的添加。分区中的消息都被分配了一个序列号称之为偏移量(offset)在每一个分区中此偏移量都是惟一的。 Kafka集群保持全部的消息直到它们过时 不管消息是否被消费了。 实际上消费者所持有的仅有的元数据就是这个偏移量也就是消费者在这个log中的位置。 这个偏移量由消费者控制正常状况当消费者消费消息的时候偏移量也线性的的增长。可是实际偏移量由消费者控制消费者能够将偏移量重置为更老的一个偏移量从新读取消息。 能够看到这种设计对消费者来讲操做自如 一个消费者的操做不会影响其它消费者对此log的处理。 再说说分区。Kafka中采用分区的设计有几个目的。一是能够处理更多的消息不受单台服务器的限制。Topic拥有多个分区意味着它能够不受限的处理更多的数据。第二分区能够做为并行处理的单元。并发

分布式(Distribution)

Log的分区被分布到集群中的多个服务器上。每一个服务器处理它分到的分区。 根据配置每一个分区还能够复制到其它服务器做为备份容错。 每一个分区有一个leader零或多个follower。Leader处理此分区的全部的读写请求而follower被动的复制数据。若是leader宕机其它的一个follower会被推举为新的leader。 一台服务器可能同时是一个分区的leader另外一个分区的follower。 这样能够平衡负载避免全部的请求都只让一台或者某几台服务器处理。异步

生产者(Producers)

生产者往某个Topic上发布消息。生产者也负责选择发布到Topic上的哪个分区。最简单的方式从分区列表中轮流选择。也能够根据某种算法依照权重选择分区。开发者负责如何选择分区的算法。
elasticsearch

消费者(Consumers)

一般来说消息模型能够分为两种 队列和发布-订阅式。 队列的处理方式是 一组消费者从服务器读取消息一条消息只有其中的一个消费者来处理。在发布-订阅模型中消息被广播给全部的消费者接收到消息的消费者均可以处理此消息。Kafka为这两种模型提供了单一的消费者抽象模型 消费者组 consumer group。 消费者用一个消费者组名标记本身。 一个发布在Topic上消息被分发给此消费者组中的一个消费者。 假如全部的消费者都在一个组中那么这就变成了queue模型。 假如全部的消费者都在不一样的组中那么就彻底变成了发布-订阅模型。 更通用的 咱们能够建立一些消费者组做为逻辑上的订阅者。每一个组包含数目不等的消费者 一个组内多个消费者能够用来扩展性能和容错。正以下图所示

wKiom1fhQUrTOmDcAABf0AAq7-s668.png

  2个kafka集群托管4个分区P0-P32个消费者组消费组A有2个消费者实例消费组B有4个。


正像传统的消息系统同样Kafka保证消息的顺序不变。 再详细扯几句。传统的队列模型保持消息而且保证它们的前后顺序不变。可是 尽管服务器保证了消息的顺序消息仍是异步的发送给各个消费者消费者收到消息的前后顺序不能保证了。这也意味着并行消费将不能保证消息的前后顺序。用过传统的消息系统的同窗确定清楚消息的顺序处理很让人头痛。若是只让一个消费者处理消息又违背了并行处理的初衷。 在这一点上Kafka作的更好尽管并无彻底解决上述问题。 Kafka采用了一种分而治之的策略分区。 由于Topic分区中消息只能由消费者组中的惟一一个消费者处理因此消息确定是按照前后顺序进行处理的。可是它也仅仅是保证Topic的一个分区顺序处理不能保证跨分区的消息前后处理顺序。 因此若是你想要顺序的处理Topic的全部消息那就只提供一个分区。

Kafka的保证(Guarantees)

生产者发送到一个特定的Topic的分区上的消息将会按照它们发送的顺序依次加入


消费者收到的消息也是此顺序


若是一个Topic配置了复制因子( replication facto)为N 那么能够容许N-1服务器宕机而不丢失任何已经增长的消息



Kafka官网

http://kafka.apache.org/


做者半兽人
连接http://orchome.com/5
来源OrcHome
著做权归做者全部。商业转载请联系做者得到受权非商业转载请注明出处。


2、安装和启动


一、下载二进制安装包直接解压

tar xf kafka_2.11-0.10.0.1.tgz
cd kafka_2.11-0.10.0.1


二、启动服务

Kafka须要用到ZooKeepr因此须要先启动一个ZooKeepr服务端若是没有单独的ZooKeeper服务端可使用Kafka自带的脚本快速启动一个单节点ZooKeepr实例

bin/zookeeper-server-start.sh config/zookeeper.properties  # 启动zookeeper服务端实例

bin/kafka-server-start.sh config/server.properties  # 启动kafka服务端实例


3、基本操做指令


一、新建一个主题topic

建立一个名为“test”的Topic只有一个分区和一个备份

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test


二、建立好以后能够经过运行如下命令查看已建立的topic信息

bin/kafka-topics.sh --list  --zookeeper localhost:2181


三、发送消息

Kafka提供了一个命令行的工具能够从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。

运行producer生产者,而后在控制台输入几条消息到服务器。

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
This is a message
This is another message


四、消费消息

Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来。

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message


五、查看topic详细状况

bin/kafka-topics.sh --describe --zookeeper localhost:2181  --topic peiyinlog

wKiom1fh5DiCDyFHAABNw6sr0ok754.png

Topic: 主题名称

Partition: 分片编号

Leader: 该分区的leader节点

Replicas: 该副本存在于哪一个broker节点

Isr: 活跃状态的broker


六、给Topic添加分区

bin/kafka-topics.sh --zookeeper 192.168.90.201:2181 --alter --topic test2 --partitions 20


七、删除Topic

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name


主题(Topic)删除选项默认是关闭的,须要服务器配置开启它。

delete.topic.enable=true


注:若是须要在其余节点做为客户端使用指令链接kafka broker,则须要注意如下两点(二选一便可)

另 : ( 使用logstash input 链接kafka也须要注意 )


一、设置kafka broker 配置文件中 host.name 参数为监听的IP地址


二、给broker设置一个惟一的主机名,而后在本机/etc/hosts文件配置解析到本身的IP(固然若是有内网的DNS服务器也行),同时还须要在zk server 和 客户端的 /etc/hosts 添加broker主机名的解析。 


缘由详解:


场景假设

broker_server ip 主机名 zookeeper ip 客户端 ip
192.168.1.2  默认 localhost 192.168.1.4 192.168.1.5
# 此时客户端向broker发起一些消费:

bin/kafka-console-consumer.sh --zookeeper 192.168.1.4:2181 --topic test2 --from-beginning


这时客户端链接到zookeeper要求消费数据,zk则返回broker的ip地址和端口给客户端,可是若是broker没有设置host.name 和 advertised.host.name  broker默认返回的是本身的主机名,默认就是localhost和端口9092,这时客户端拿到这个主机名解析到本身,操做失败。


因此,须要配置broker 的host.name参数为监听的IP,这时broker就会返回IP。 客户端就能正常链接了。


或者也能够设置好broker的主机名,而后分别给双方配置好解析。


4、broker基本配置

#  server.properties

broker.id=0  # broker节点的惟一标识 ID 不能重复。
host.name=10.10.4.1  # 监听的地址,若是不设置默认返回主机名给zk_server
log.dirs=/u01/kafka/kafka_2.11-0.10.0.1/data  # 消息数据存放路径
num.partitions=6  # 默认主题(Topic)分片数
log.retention.hours=24  # 消息数据的最大保留时长
zookeeper.connect=10.160.4.225:2181  # zookeeper server 链接地址和端口



5、Logstash + Kafka 实战应用


Logstash-1.51才开始内置Kafka插件,也就是说用以前的logstash版本是须要手动编译Kafka插件的,相信也不多人用了。建议使用2.3以上的logstash版本。


一、使用logstash向kafka写入一些数据


软件版本:

logstash 2.3.2 

kafka_2.11-0.10.0.1


logstash output 部分配置

output {
  kafka {
    workers => 2
    bootstrap_servers => "10.160.4.25:9092,10.160.4.26:9092,10.160.4.27:9092"
    topic_id => "xuexilog"

}

}


参数解释 : 

workers:用于写入时的工做线程

bootstrap_servers:指定可用的kafka broker实例列表

topic_id:指定topic名称,能够在写入前手动在broker建立定义好分片数和副本数,也能够不提早建立,那么在logstash写入时会自动建立topic,分片数和副本数则默认为broker配置文件中设置的。



二、使用logstash消费一些数据,并写入到elasticsearch


软件版本:

logstash 2.3.2 

elasticsearch-2.3.4


logstash 配置文件

input{
    kafka {
        zk_connect => "112.100.6.1:2181,112.100.6.2:2181,112.100.6.3:2181"
        group_id => "logstash"
        topic_id => "xuexilog"
        reset_beginning => false
        consumer_threads => 5
        decorate_events => true

}

}

# 这里group_id 须要解释一下,在Kafka中,相同group的Consumer能够同时消费一个topic,不一样group的Consumer工做则互不干扰。
# 补充: 在同一个topic中的同一个partition同时只能由一个Consumer消费,当同一个topic同时须要有多个Consumer消费时,则能够建立更多的partition。

output {
    if [type] == "nginxacclog" {
        elasticsearch {
            hosts => ["10.10.1.90:9200"]
            index => "logstash-nginxacclog-%{+YYYY.MM.dd}"
            manage_template => true
            flush_size => 50000
            idle_flush_time => 10
            workers => 2
}

}

}


三、经过group_id 查看当前详细的消费状况

bin/kafka-consumer-groups.sh --group logstash --describe --zookeeper 127.0.0.1:2181

wKiom1fiTL-xhYo5AABDZmsbids038.png


输出解释:

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
消费者组 话题id 分区id 当前已消费的条数 总条数 未消费的条数
相关文章
相关标签/搜索