Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,以后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。
Apache Kafka与传统消息系统相比,有如下不一样:node
它被设计为一个分布式系统,易于向外扩展;apache
它同时为发布和订阅提供高吞吐量;bootstrap
它支持多订阅者,当失败时能自动平衡消费者;vim
它将消息持久化到磁盘,所以可用于批量消费,例如ETL,以及实时应用程序。centos
下载地址:https://kafka.apache.org/downloadsbash
wget http://mirrors.shuosc.org/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
解压:服务器
tar -zxvf kafka_2.11-1.0.0.tgz cd /usr/local/kafka_2.11-1.0.0/
修改 kafka-server 的配置文件session
vim /usr/local/kafka/config/server.properties
修改其中的:分布式
broker.id=1 log.dir=/data/kafka/logs-1
使用安装包中的脚本启动单节点 Zookeeper 实例:ide
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
使用 kafka-server-start.sh 启动 kafka 服务:
bin/kafka-server-start.sh config/server.properties
启动完成
[2018-07-20 16:18:07,243] INFO [GroupCoordinator 1]: Startup complete. (kafka.coordinator.group.GroupCoordinator) [2018-07-20 16:18:07,284] INFO [ProducerId Manager 1]: Acquired new producerId block (brokerId:1,blockStartProducerId:0,blockEndProducerId:999) by writing to Zk with path version 1 (kafka.coordinator.transaction.ProducerIdManager) [2018-07-20 16:18:07,321] INFO [TransactionCoordinator id=1] Starting up. (kafka.coordinator.transaction.TransactionCoordinator) [2018-07-20 16:18:07,330] INFO [TransactionCoordinator id=1] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator) [2018-07-20 16:18:07,355] INFO [Transaction Marker Channel Manager 1]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager) [2018-07-20 16:18:07,402] INFO Got user-level KeeperException when processing sessionid:0x164b6c0523f0000 type:delete cxid:0x42 zxid:0x1a txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election (org.apache.zookeeper.server.PrepRequestProcessor) [2018-07-20 16:18:07,464] INFO Creating /brokers/ids/1 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2018-07-20 16:18:07,466] INFO Got user-level KeeperException when processing sessionid:0x164b6c0523f0000 type:create cxid:0x4b zxid:0x1b txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers (org.apache.zookeeper.server.PrepRequestProcessor) [2018-07-20 16:18:07,467] INFO Got user-level KeeperException when processing sessionid:0x164b6c0523f0000 type:create cxid:0x4c zxid:0x1c txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor) [2018-07-20 16:18:07,474] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2018-07-20 16:18:07,476] INFO Registered broker 1 at path /brokers/ids/1 with addresses: EndPoint(10.66.95.67,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils) [2018-07-20 16:18:07,479] WARN No meta.properties file under dir /tmp/kafka-logs-1/meta.properties (kafka.server.BrokerMetadataCheckpoint) [2018-07-20 16:18:07,514] INFO Kafka version : 1.0.1 (org.apache.kafka.common.utils.AppInfoParser) [2018-07-20 16:18:07,514] INFO Kafka commitId : c0518aa65f25317e (org.apache.kafka.common.utils.AppInfoParser) [2018-07-20 16:18:07,526] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
使用 kafka-topics.sh 建立单分区单副本的 topic test:
[root@V1 kafka_2.12-1.0.1]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看 topic 列表:
[root@V1 kafka_2.12-1.0.1]# bin/kafka-topics.sh --list --zookeeper localhost:2181
使用 kafka-console-producer.sh 发送消息:
[root@V2 kafka_2.12-1.0.1]# bin/kafka-console-producer.sh --broker-list xxx.xxx.xxx.xxx:9092 --topic test >wwwwwwwwwwwwwwwwwwwwwwwwwwwwwww
使用 kafka-console-consumer.sh 接收消息并在终端打印:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
打开个新的命令窗口执行上面命令便可查看信息:
[root@PaulV1 kafka_2.12-1.0.1]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. hhhhhhhhhhhhhhhhhh wwwwwwwwwwwwwwwwwwwwwwwwwwwwwww
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
显示结果
[root@V1 kafka_2.12-1.0.1]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
第一行给出了全部分区的摘要,每一个附加行给出了关于一个分区的信息。 因为咱们只有一个分区,因此只有一行。
“Leader” : 是负责给定分区的全部读取和写入的节点。 每一个节点将成为分区随机选择部分的领导者。
“Replicas” : 是复制此分区日志的节点列表,不管它们是不是领导者,或者即便他们当前处于活动状态。
“Isr” : 是一组“同步”副本。这是复制品列表的子集,当前活着并被引导到领导者。
Kafka 支持两种模式的集群搭建:能够在单机上运行多个 broker 实例来实现集群,也可在多台机器上搭建集群,下面介绍下如何实现单机多 broker 实例集群,其实很简单,只须要以下配置便可。
利用单节点部署多个 broker。 不一样的 broker 设置不一样的 id,监听端口及日志目录。 例如:
cp config/server.properties config/server-2.properties cp config/server.properties config/server-3.properties vim config/server-2.properties vim config/server-3.properties
修改 :
broker.id=2 listeners = PLAINTEXT://your.host.name:9093 log.dir=/data/kafka/logs-2
和
broker.id=3 listeners = PLAINTEXT://your.host.name:9094 log.dir=/data/kafka/logs-3
启动Kafka服务:
bin/kafka-server-start.sh config/server-2.properties & bin/kafka-server-start.sh config/server-3.properties &
至此,单机多broker实例的集群配置完毕。
分别在多个节点按上述方式安装 Kafka,配置启动多个 Zookeeper 实例。
假设三台机器 IP 地址是 : 192.168.153.135, 192.168.153.136, 192.168.153.137
分别配置多个机器上的 Kafka 服务,设置不一样的 broker id,zookeeper.connect 设置以下:
vim config/server.properties
里面的 zookeeper.connect
修改成:
zookeeper.connect=192.168.153.135:2181,192.168.153.136:2181,192.168.153.137:2181
从控制台写入数据并将其写回控制台是一个方便的起点,但您可能想要使用其余来源的数据或将数据从 Kafka 导出到其余系统。对于许多系统,您可使用 Kafka Connect 来导入或导出数据,而没必要编写自定义集成代码。
Kafka Connect 是 Kafka 包含的一个工具,能够将数据导入和导出到 Kafka。它是一个可扩展的工具,运行 链接器,实现与外部系统交互的自定义逻辑。在这个快速入门中,咱们将看到如何使用简单的链接器运行 Kafka Connect,这些链接器将数据从文件导入到 Kafka topic,并将数据从 Kafka topic 导出到文件。
首先,咱们将经过建立一些种子数据开始测试:
echo -e "zhisheng\ntian" > test.txt
接下来,咱们将启动两个以独立模式运行的链接器,这意味着它们将在单个本地专用进程中运行。咱们提供三个配置文件做为参数。首先是 Kafka Connect 过程的配置,包含常见的配置,例如要链接的 Kafka 代理以及数据的序列化格式。其他的配置文件都指定一个要建立的链接器。这些文件包括惟一的链接器名称,要实例化的链接器类以及链接器所需的任何其余配置。
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
Kafka 附带的这些示例配置文件使用您以前启动的默认本地群集配置,并建立两个链接器:第一个是源链接器,用于读取输入文件中的行,并将每一个链接生成为 Kafka topic,第二个为链接器它从 Kafka topic 读取消息,并在输出文件中产生每行消息。
在启动过程当中,您会看到一些日志消息,其中一些指示链接器正在实例化。Kafka Connect 进程启动后,源链接器应该开始读取 test.txt topic connect-test,并将其生成 topic ,而且接收器链接器应该开始读取 topic 中的消息 connect-test 并将其写入文件 test.sink.txt。咱们能够经过检查输出文件的内容来验证经过整个管道传输的数据:
数据存储在 Kafka topic 中 connect-test,所以咱们也能够运行控制台使用者来查看 topic 中的数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
链接器继续处理数据,因此咱们能够将数据添加到文件中,并看到它在管道中移动:
echo zhishengtian>> test.txt echo zhishengtian2>> test.txt echo zhishengtian3>> test.txt echo zhishengtian4>> test.txt
Kafka Streams 是用于构建关键任务实时应用程序和微服务的客户端库,输入和/或输出数据存储在 Kafka 集群中。Kafka Streams 结合了在客户端编写和部署标准 Java 和 Scala 应用程序的简单性以及 Kafka 服务器端集群技术的优点,使这些应用程序具备高度可伸缩性,弹性,容错性,分布式等特性。
可参考官网入门案例:http://kafka.apache.org/10/documentation/streams/quickstart
二、http://kafka.apache.org/10/documentation/streams/quickstart