Kafka
是一个分布式消息队列(MQ, Message queue)中间件,支持点对点(Quene)、发布订阅(Topic)模式。Kafka 的定位主要在日志等方面,单击吞吐量特别大, 由于Kafka 设计的初衷就是处理日志的,能够看作是一个日志(消息)系统一个重要组件,针对性很强。php
使用场景:html
官网:http://kafka.apache.org/
中文站:http://kafka.apachecn.org/python
名称: Kafka 所属社区/公司:Apache 开发语言: Java 协议: 自行设计的协议,仿AMQP 事务:不支持 集群:支持,依赖ZooKeeper
官方的 quickstart 已经很是详细了,按照文档能够一步一步的达到入门的效果。地址:http://kafka.apache.org/quickstartgit
这里我记录一下简单的步骤,仅做为测试使用,真实环境请参考官方文档部署:
一、下载解压:github
$ cd /opt $ wget http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz $ tar -xzf kafka_2.12-2.2.0.tgz $ cd kafka_2.12-2.2.0
Kafka 依赖 ZooKeeper 。安装包里已经包含了 ZooKeeper。golang
二、启动 ZooKeeper面试
$ bin/zookeeper-server-start.sh config/zookeeper.properties # 限于篇幅,省略大部分输出 ... [2019-05-11 13:15:44,643] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
若是须要后台运行,请在命令后面追加
&
。redis
三、启动 Kafka Server端apache
$ bin/kafka-server-start.sh config/server.properties # 限于篇幅,省略大部分输出 ... [2019-05-11 13:18:34,578] INFO Kafka version: 2.2.0 (org.apache.kafka.common.utils.AppInfoParser) [2019-05-11 13:18:34,578] INFO Kafka commitId: 05fcfde8f69b0349 (org.apache.kafka.common.utils.AppInfoParser) [2019-05-11 13:18:34,579] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
若是须要后台运行,请在命令后面追加
&
。bootstrap
四、建立主题(Topic)
建立一个名为 test 的主题,包含1个分区(partition),1个副本(replication-factor):
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
建立完毕后能够查看该主题:
$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092 test
也能够在配置里设置为在发布不存在的主题时自动建立主题,而不是手动建立主题。这个后面再说明。
五、发布消息
咱们新启动一个命令行窗口充当生产者,向 Kafka 里发送消息,指定主题为 test
:
$ cd /opt/kafka_2.12-2.2.0/ $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test >
而后命令行等待咱们输入消息。咱们输入 hello
回车:
>hello >
消息就发出去了。接下来咱们启动消费者。
六、消费消息
咱们新启动一个命令行窗口充当消费者来消费消息,指定主题为 test
:
$ cd /opt/kafka_2.12-2.2.0/ $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning hello
就消费了1条消息。咱们能够在生产者命令行窗口继续发生消息,消费者端能够实时消费。
好了,基本的安装测试就到这。关于设置kakfa集群请参考:http://kafka.apache.org/quickstart#quickstart_multibroker
上一节仅演示了在命令行里使用,能够方便调试。对于在项目里使用,须要借助 SDK。这个页面收录了全部的客户端:https://cwiki.apache.org/confluence/display/KAFKA/Clients
经常使用的SDK:
这里以 kafka-php
为例。
kafka-php
使用纯粹的PHP 编写的 kafka 客户端,目前支持 0.8.x 以上版本的 Kafka。最新的kafka-php
版本是v0.2.8
(截止到2019-05-11),详见:https://github.com/weiboad/kafka-php/releases 。kafka-php
的v0.2.x
和v0.1.x
不兼容,若是使用原有的v0.1.x
的能够参照文档 Kafka PHP v0.1.x Document, 不过建议切换到v0.2.x
上。
kafka-php
(v0.2.8
) 环境要求:
一、发送消息,同步方式:
require '../vendor/autoload.php'; date_default_timezone_set('PRC'); // use Monolog\Logger; // //use Monolog\Handler\StdoutHandler; // Create the logger // $logger = new Logger('my_logger'); // //Now add some handlers // $logger->pushHandler(new StdoutHandler()); $config = \Kafka\ProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('127.0.0.1:9192,127.0.0.1:9193'); $config->setBrokerVersion('0.10.2.1'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); $config->setTimeout(2000); $producer = new \Kafka\Producer(); // $producer->setLogger($logger); for($i = 0; $i < 100; $i++) { $result = $producer->send(array( array( 'topic' => 'test1', 'value' => 'test1....message.', 'key' => '', ), )); var_dump($result); }
说明:
1) 设置 logger
不是必选的。可是若是须要调试,建议加上。若是没有安装Monolog
,也能够本身定一个 logger ,只要实现了 psr/log
规范便可。
2) MetadataBrokerList
支持集群配置。使用英文逗号隔开便可。
3) BrokerVersion
版本需与安装的 kafka 版本一致。
二、消费消息
消费消息通常须要写脚本常驻运行。能够借助 Supervisor 工具。
require '../vendor/autoload.php'; date_default_timezone_set('PRC'); // use Monolog\Logger; // use Monolog\Handler\StdoutHandler; // // Create the logger // $logger = new Logger('my_logger'); // // Now add some handlers // $logger->pushHandler(new StdoutHandler()); $config = \Kafka\ConsumerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('10.13.4.159:9192'); $config->setGroupId('test'); //消费者组 $config->setBrokerVersion('0.10.2.1'); $config->setTopics(['test']); //主题 //$config->setOffsetReset('earliest'); $consumer = new \Kafka\Consumer(); // $consumer->setLogger($logger); $consumer->start(function($topic, $part, $message) { var_dump($message); });
注意:
1) 消费者组能够有多个,互相之间不影响。每一个消费者组均可以消费到完整的一份消息。
2) setOffsetReset
的值有:earliest
(从最先的开始消费)、latest
(从最新的开始消费)。
发送消息示例:
from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='localhost:9092') for _ in range(100): producer.send('test', b'some_message_bytes')
(上图为Kakfa架构图)
一个典型的消息队列 Kafka 集群包含:
Producer
:经过 push 模式向消息队列 Kafka Broker 发送消息,能够是网站的页面访问、服务器日志等,也能够是 CPU 和内存相关的系统资源信息;Kafka Broker
:消息队列 Kafka 的服务器,用于存储消息;支持水平扩展,通常 Broker 节点数量越多,集群吞吐率越高;Consumer Group
:经过 pull 模式从消息队列 Kafka Broker 订阅并消费消息;Zookeeper
:管理集群的配置、选举 leader,以及在 Consumer Group 发生变化时进行负载均衡。Broker
:消息队列 Kafka 集群包含一个或多个消息处理服务器,该服务器被称为 Broker。Topic
:主题。每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。Partition
:分区。一个Topic中的消息数据按照多个分区组织,分区是kafka消息队列组织的最小单位,一个分区能够看做是一个FIFO
( First Input First Output的缩写,先入先出队列)的队列。Producer
: 消息发布者,也称为消息生产者,负责生产并发送消息到 Kafka Broker。Consumer
: 消息订阅者,也称为消息消费者,负责向 Kafka Broker 读取消息并进行消费。Consumer Group
:消费者组。这类 Consumer 一般接收并消费同一类消息,且消费逻辑一致。Consumer Group 和 Topic 的关系是 N:N,同一个 Consumer Group 能够订阅多个 Topic,同一个 Topic 也能够被多个 Consumer Group 订阅。Replication
:副本。为了保证分布式可靠性,kafka0.8开始对每一个分区的数据进行备份(不一样的Broker上),防止其中一个Broker宕机形成分区上的数据不可用。消息队列 Kafka 采用 Pub/Sub
(发布/订阅)模型,其中:
说明:
一、同一个分区(partition)内的消息只能被同一个组中的一个消费者(consumer)消费,当消费者数量多于分区的数量时,多余的消费者空闲。
二、启动多个组,则会使同一个消息被消费屡次。
详细请看:https://www.jianshu.com/p/6233d5341dfe
生产者消费者关系:
对于每个topic, Kafka集群都会维持一个分区日志,以下所示:
kafka分区是提升kafka性能的关键所在,当发现集群性能不高时,经常使用手段就是增长Topic的分区,分区里面的消息是按照重新到老的顺序进行组织,消费者从队列头订阅消息,生产者从队列尾添加消息。
Kafka 负载消费的内部原理是,把订阅的 Topic 的分区,平均分配给各个消费实例。所以,消费实例的个数不要大于分区的数量,不然会有实例分配不到任何分区而处于空跑状态。这个负载均衡发生的时间,除了第一次启动上线以外,后续消费实例发生重启、增长、减小等变动时,都会触发一次负载均衡。
Kafka支持的配置很是多,这里仅仅列出来部分关于 broker 的配置。broker 配置文件是config/server.properties
。
每一个kafka broker中配置文件默认必须配置的属性以下:
broker.id=0 port=9092 num.network.threads=2 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 log.retention.hours=168 log.segment.bytes=536870912 log.retention.check.interval.ms=60000 log.cleaner.enable=false zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=1000000
配置说明:
参数 | 默认值 | 描述 |
---|---|---|
broker.id | -1 | 用于服务的broker id。若是没设置,将生成一个惟一broker id。为了不ZooKeeper生成的id和用户配置的broker id相冲突,生成的id将在reserved.broker.max.id的值基础上加1。 |
port | 9092 | broker server服务端口。仅在未设置listeners 时使用。 |
host.name | broker的主机地址,如果设置了,那么会绑定到这个地址上,如果没有,会绑定到全部的接口上,并将其中之一发送到ZK。仅在未设置listeners 时使用。 |
|
log.dirs | /tmp/kafka-logs | kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不一样磁盘上能够提升读写性能 /data/kafka-logs-1,/data/kafka-logs-2 |
message.max.bytes | 1000012 | 表示消息体的最大大小,单位是字节 |
num.network.threads | 3 | broker处理消息的最大线程数,通常状况下数量为cpu核数 |
num.io.threads | 8 | 处理IO的线程数 |
log.flush.interval.messages | Long.MaxValue | 在数据被写入到硬盘和消费者可用前最大累积的消息的数量 |
log.flush.interval.ms | Long.MaxValue | 在数据被写入到硬盘前的最大时间 |
log.flush.scheduler.interval.ms | Long.MaxValue | 检查数据是否要写入到硬盘的时间间隔。 |
log.retention.hours | 168 | 控制一个log保留多长个小时 |
log.retention.bytes | -1 | 控制log文件最大尺寸 |
log.cleaner.enable | false | 是否log cleaning |
log.cleanup.policy | delete | delete仍是compat. |
log.segment.bytes | 1073741824 | 单一的log segment文件大小 |
log.roll.hours | 168 | 开始一个新的log文件片断的最大时间 |
background.threads | 10 | 后台线程序 |
num.partitions | 1 | 默认分区数 |
socket.send.buffer.bytes | 102400 | socket SO_SNDBUFF参数 |
socket.receive.buffer.bytes | 102400 | socket SO_RCVBUFF参数 |
zookeeper.connect | 指定zookeeper链接字符串, 格式如hostname:port/chroot。chroot是一个namespace | |
zookeeper.connection.timeout.ms | 6000 | 指定客户端链接zookeeper的最大超时时间 |
zookeeper.session.timeout.ms | 6000 | 链接zk的session超时时间 |
zookeeper.sync.time.ms | 2000 | zk follower落后于zk leader的最长时间 |
auto.create.topics.enable | true | 是否容许在服务器上自动建立topic |
更多配置查看官方文档:http://kafka.apache.org/documentation.html#configuration
$ bin/zookeeper-server-start.sh config/zookeeper.properties &
$ bin/zookeeper-server-stop.sh
$ bin/kafka-server-start.sh config/server.properties &
$ bin/kafka-server-stop.sh
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
$ bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
$ bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
https://github.com/Morningstar/kafka-offset-monitor
消息队列主要是解决了应用解耦、异常处理、流量削锋等问题。常见的消息队列还有:
ActiveMQ
、RabbitMQ
、RocketMQ
、ZeroMQ
、MetaMQ
等等。固然,咱们也可使用Redis
做为简单的消息队列使用。
消息队列对比参考:
(图片来源于互联网)
一、Apache Kafka
http://kafka.apache.org/documentation/
二、消息队列Kafka、RocketMQ、RabbitMQ的优劣势比较 - 知乎
https://zhuanlan.zhihu.com/p/60288391
三、weiboad/kafka-php: kafka php client
https://github.com/weiboad/kafka-php
四、kafka中partition和消费者对应关系 - 简书
https://www.jianshu.com/p/6233d5341dfe
五、kafka经常使用的命令 - 随笔 - SegmentFault 思否
http://www.javashuo.com/article/p-afrncoki-bk.html
六、消息中间件部署及比较:rabbitMQ、activeMQ、zeroMQ、rocketMQ、Kafka、redis - 掘金
http://www.javashuo.com/article/p-azrmvmrq-gv.html
七、面试官问分布式技术面试题,一脸懵逼怎么办?_ITPUB博客
http://blog.itpub.net/69917606/viewspace-2642545/
八、产品架构_产品简介_消息队列 Kafka-阿里云
https://help.aliyun.com/document_detail/68152.html?spm=a2c4g.11186623.6.543.3ba272e4cAMqaH