Kafka入门宝典(详细截图版)

file

一、了解 Apache Kafka

1.一、简介

file

官网:http://kafka.apache.org/java

  • Apache Kafka 是一个开源消息系统,由Scala 写成。是由Apache 软件基金会开发的一个开源消息系统项目。
  • Kafka 最初是由LinkedIn 开发,并于2011 年初开源。2012 年10 月从Apache Incubator 毕业。该项目的目标是为处理实时数据提供一个统1、高通量、低等待(低延时)的平台。
  • Kafka 是一个分布式消息系统:具备生产者、消费者的功能。它提供了相似于JMS 的特性,可是在设计实现上彻底不一样,此外它并非JMS 规范的实现。【重点】

1.二、kafka的基本结构

file

  • Producer:消息的发送者
  • Consumer:消息的接收者
  • kafka cluster:kafka的集群。
  • Topic:就是消息类别名,一个topic中一般放置一类消息。每一个topic都有一个或者多个订阅者(消费者)。

消息的生产者将消息推送到kafka集群,消息的消费者从kafka集群中拉取消息。node

1.三、kafka的完整架构

file

说明:git

  • broker:集群中的每个kafka实例,称之为broker;
  • ZooKeeper:Kafka 利用ZooKeeper 保存相应元数据信息, Kafka 元数据信息包括如代理节点信息、Kafka集群信息、旧版消费者信息及其消费偏移量信息、主题信息、分区状态信息、分区副本分配方案信息、动态配置信息等。
  • ConsumerGroup:在Kafka 中每个消费者都属于一个特定消费组( ConsumerGroup ),咱们能够为每一个消费者指定一个消费组,以groupld 表明消费组名称,经过group.id 配置设置。若是不指定消费组,则该消费者属于默认消费组test-consumer-group 。

1.四、kafka的特性

  • 消息持久化
    • Kafka 基于文件系统来存储和缓存消息。
  • 高吞吐量
    • Kafka 将数据写到磁盘,充分利用磁盘的顺序读写。同时, Kafka 在数据写入及数据同步采用了*零拷贝*( zero-copy )技术,采用sendFile()函数调用,sendFile()函数是在两个文件描述符之间直接传递数据,彻底在内核中操做,从而避免了内核缓冲区与用户缓冲区之间数据的拷贝,操做效率极高。
    • Kafka 还支持数据压缩及批量发送,同时Kafka 将每一个主题划分为多个分区,这一系列的优化及实现方法使得Kafka 具备很高的吞吐量。经大多数公司对Kafka 应用的验证, Kafka 支持每秒数百万级别的消息
  • 高扩展性
    • Kafka 依赖ZooKeeper来对集群进行协调管理,这样使得Kafka 更加容易进行水平扩展,生产者、消费者和代理都为分布式,可配置多个。
    • 同时在机器扩展时无需将整个集群停机,集群可以自动感知,从新进行负责均衡及数据复制。
  • 多客户端支持
    • Kafka 核心模块用Scala 语言开发,Kafka 提供了多种开发语言的接入,如Java 、Scala、C 、C++、Python 、Go 、Erlang 、Ruby 、Node. 等。
  • 安全机制
    • Kafka 支持如下几种安全措施:
      • 经过SSL 和SASL(Kerberos), SASL/PLA时验证机制支持生产者、消费者与broker链接时的身份认证;
      • 支持代理与ZooKeeper 链接身份验证;
      • 通讯时数据加密;
      • 客户端读、写权限认证;
      • Kafka 支持与外部其余认证受权服务的集成;
  • 数据备份
    • Kafka 能够为每一个topic指定副本数,对数据进行持久化备份,这能够必定程度上防止数据丢失,提升可用性。
  • 轻量级
    • Kafka 的实例是无状态的,即broker不记录消息是否被消费,消费偏移量的管理交由消费者本身或组协调器来维护。
    • 同时集群自己几乎不须要生产者和消费者的状态信息,这就使得Kafka很是轻量级,同时生产者和消费者客户端实现也很是轻量级。
  • 消息压缩
    • Kafka 支持Gzip, Snappy 、LZ4 这3 种压缩方式,一般把多条消息放在一块儿组成MessageSet,而后再把Message Set 放到一条消息里面去,从而提升压缩比率进而提升吞吐量。

1.五、kafka的应用场景

  • 消息系统。
    • Kafka 做为一款优秀的消息系统,具备高吞吐量、内置的分区、备份冗余分布式等特色,为大规模消息处理提供了一种很好的解决方案。
  • 应用监控。
    • 利用Kafka 采集应用程序和服务器健康相关的指标,如CPU 占用率、IO 、内存、链接数、TPS 、QPS 等,而后将指标信息进行处理,从而构建一个具备监控仪表盘、曲线图等可视化监控系统。例如,不少公司采用Kafka 与ELK (Elastic Search 、Logstash 和Kibana)整合构建应用服务监控系统。
  • 网站用户行为追踪。
    • 为了更好地了解用户行为、操做习惯,改善用户体验,进而对产品升级改进,将用户操做轨迹、内容等信息发送到Kafka 集群上,经过Hadoop 、Spark 或Strom等进行数据分析处理,生成相应的统计报告,为推荐系统推荐对象建模提供数据源,进而为每一个用户进行个性化推荐。
  • 流处理。
    • 须要将己收集的流数据提供给其余流式计算框架进行处理,用Kafka 收集流数据是一个不错的选择。
  • 持久性日志。
    • Kafka 能够为外部系统提供一种持久性日志的分布式系统。日志能够在多个节点间进行备份, Kafka 为故障节点数据恢复提供了一种从新同步的机制。同时, Kafka很方便与HDFS 和Flume 进行整合,这样就方便将Kafka 采集的数据持久化到其余外部系统。

二、Kafka的安装与配置

准备三台虚拟机,分别是node01,node02,node03,而且修改hosts文件以下:github

~~~shellvim /etc/hostsshell

注意: 前面的ip地址改为本身的ip地址

192.168.40.133 node01192.168.40.134 node02192.168.40.135 node03apache

3台服务器的时间要一致

时间更新:

yum install -y rdaterdate -s time-b.nist.gov~~~bootstrap

2.一、基础环境配置

2.1.一、JDK环境

因为Kafka 是用Scala 语言开发的,运行在JVM上,所以在安装Kafka 以前须要先安装JDK 。vim

安装过程略过,我这里使用的是jdk1.8。api

file

2.1.二、ZooKeeper环境

2.1.2.一、安装ZooKeeper

Kafka 依赖ZooKeeper ,经过ZooKeeper 来对服务节点、消费者上下线管理、集群、分区元数据管理等,所以ZooKeeper 也是Kafka 得以运行的基础环境之一。浏览器

#上传zookeeper-3.4.9.tar.gz到/export/software
cd /export/software
mkdir -p /export/servers/
tar -xvf zookeeper-3.4.9.tar.gz -C /export/servers/
#建立ZooKeeper的data目录
mkdir /export/data/zookeeper -p
cd /export/servers/zookeeper-3.4.9/conf/
#修改配置文件
mv zoo_sample.cfg zoo.cfg
vim zoo.cfg
#设置data目录
dataDir=/export/data/zookeeper
#启动ZooKeeper
./zkServer.sh start
#检查是否启动成功
jps复制代码

2.1.2.三、搭建ZooKeeper集群

~~~shell

在/export/data/zookeeper目录中建立myid文件

vim /export/data/zookeeper/myid

写入对应的节点的id,如:1,2等,保存退出

在conf下,修改zoo.cfg文件

vim zoo.cfg

添加以下内容

server.1=node01:2888:3888server.2=node02:2888:3888server.3=node03:2888:3888~~~

2.1.2.三、配置环境变量

~~~shellvim /etc/profileexport ZK_HOME=/export/servers/zookeeper-3.4.9export PATH=${ZK_HOME}/bin:$PATH

当即生效

source /etc/profile~~~

2.1.2.四、分发到其它机器

~~~shellscp /etc/profile node02:/etc/scp /etc/profile node03:/etc/

cd /export/serversscp -r zookeeper-3.4.9 node02:/export/servers/scp -r zookeeper-3.4.9 node03:/export/servers/~~~

2.1.2.五、一键启动、中止脚本

~~~shellmkdir -p /export/servers/onekey/zkvim slave

输入以下内容

node01node02node03

保存退出

vim startzk.sh

输入以下内容

cat /export/servers/onekey/zk/slave | while read linedo{echo "开始启动 --> "$linessh $line "source /etc/profile;nohup sh ${ZK_HOME}/bin/zkServer.sh start >/dev/null 2>&1 &"}&waitdoneecho "★★★启动完成★★★"

保存退出

vim stopzk.sh

输入以下内容

cat /export/servers/onekey/zk/slave | while read linedo{echo "开始中止 --> "$linessh $line "source /etc/profile;nohup sh ${ZK_HOME}/bin/zkServer.sh stop >/dev/null 2>&1 &"}&waitdoneecho "★★★中止完成★★★"

保存退出

设置可执行权限

chmod +x startzk.sh stopzk.sh

添加到环境变量中

export ZK_ONEKEY=/export/servers/onekeyexport PATH=${ZK_ONEKEY}/zk:$PATH~~~

2.1.2.六、检查启动是否成功

file

发现三台机器都有“QuorumPeerMain”进程,说明机器已经启动成功了。

检查集群是否正常:

zkServer.sh status

file

file

file

发现,集群运行一切正常。

2.二、安装Kafka

2.2.一、单机版Kafka安装

第一步:上传Kafka安装包而且解压

~~~shellrz 上传kafka_2.11-1.1.0.tgz到 /export/software/cd /export/software/tar -xvf kafka_2.11-1.1.0.tgz -C /export/servers/cd /export/serversmv kafka_2.11-1.1.0/ kafka~~~

第二步:配置环境变量

~~~shellvim /etc/profile

输入以下内容

export KAFKA_HOME=/export/servers/kafkaexport PATH=${KAFKA_HOME}/bin:$PATH

保存退出

source /etc/profile~~~

第三步:修改配置文件

~~~shellcd /export/servers/kafkacd configvim server.properties

The id of the broker. This must be set to a unique integer for each broker.

必需要只要一个brokerid,而且它必须是惟一的。

broker.id=0

A comma separated list of directories under which to store log files

日志数据文件存储的路径 (如不存在,须要手动建立该目录, mkdir -p /export/data/kafka/)

log.dirs=/export/data/kafka

ZooKeeper的配置,本地模式下指向到本地的ZooKeeper服务便可

zookeeper.connect=node01:2181

保存退出

~~~

第四步:启动kafka服务

~~~shell

以守护进程的方式启动kafka

kafka-server-start.sh -daemon /export/servers/kafka/config/server.properties~~~

第五步:检测kafka是否启动

file

若是进程中有名为kafka的进程,就说明kafka已经启动了。

2.2.二、验证kafka是否安装成功

因为kafka是将元数据保存在ZooKeeper中的,因此,能够经过查看ZooKeeper中的信息进行验证kafka是否安装成功。

file

file

file

2.2.三、部署kafka-manager

Kafka Manager 由 yahoo 公司开发,该工具能够方便查看集群 主题分布状况,同时支持对 多个集群的管理、分区平衡以及建立主题等操做。

源码托管于github:https://github.com/yahoo/kafka-manager

第一步:上传Kafka-manager安装包而且解压

~~~shellrz上传kafka-manager-1.3.3.17.tar.gz到 /export/software/cd /export/softwaretar -xvf kafka-manager-1.3.3.17.tar.gz -C /export/servers/cd /export/servers/kafka-manager-1.3.3.17/conf~~~

第二步:修改配置文件

~~~shell

修改配置文件

vim application.conf

新增项,http访问服务的端口

http.port=19000

修改为本身的zk机器地址和端口

kafka-manager.zkhosts="node01:2181"

保存退出

~~~

第三步:启动服务

~~~shellcd /export/servers/kafka-manager-1.3.3.17/bin

启动服务

./kafka-manager -Dconfig.file=../conf/application.conf

制做启动脚本

vim /etc/profileexport KAFKAMANAGEHOME=/export/servers/kafka-manager-1.3.3.17export PATH=${KAFKAMANAGEHOME}/bin:$PATH

source /etc/profile

cd /export/servers/onekey/mkdir kafka-managercd kafka-managervim start-kafka-manager.shnohup kafka-manager -Dconfig.file=${KAFKAMANAGEHOME}/conf/application.conf >/dev/null 2>&1 &chmod +x start-kafka-manager.shvim /etc/profileexport PATH=${ZK_ONEKEY}/kafka-manager:$PATHsource /etc/profile

~~~

第四步:检查是否启动成功

打开浏览器,输入地址:http://node01:19000/,便可看到kafka-manage管理界面。

file

2.2.四、kafka-manager的使用

进入管理界面,是没有显示Cluster信息的,须要添加后才能操做。

  • 添加 Cluster:

file

输入Cluster Name、ZooKeeper信息、以及Kafka的版本信息(这里最高只能选择1.0.0)。

file

点击Save按钮保存。

file

添加成功。

  • 查看kafka的信息
    file
  • 查看Broker信息
    file
  • 查看Topic列表
    file
  • 查看单个topic信息以及操做
    file
  • 优化副本选举
    file
  • 查看消费者信息
    file

2.2.五、搭建kafka集群

kafka集群的搭建是很是简单的,只须要将上面的单机版的kafka分发的其余机器,而且将ZooKeeper信息修改为集群的配置以及设置不一样的broker值便可。

第一步:将kafka分发到node0二、node03

~~~cd /export/servers/scp -r kafka node02:/export/servers/scp -r kafka node03:/export/servers/scp /etc/profile node02:/etc/scp /etc/profile node03:/etc/

分别到node0二、node03机器上执行

source /etc/profile~~~

第二步:修改node0一、node0二、node03上的kafka配置文件

  • node01:

~~~shellcd /export/servers/kafka/configvim server.propertieszookeeper.connect=node01:2181,node02:2181,node03:2181~~~

  • node02:

~~~shellcd /export/servers/kafka/configvim server.propertiesbroker.id=1zookeeper.connect=node01:2181,node02:2181,node03:2181~~~

  • node03:

~~~shellcd /export/servers/kafka/configvim server.propertiesbroker.id=2zookeeper.connect=node01:2181,node02:2181,node03:2181~~~

第三步:编写一键启动、中止脚本。注意:该脚本依赖于环境变量中的KAFKA_HOME。

~~~shellmkdir -p /export/servers/onekey/kafkavim slave

输入以下内容

node01node02node03

保存退出

vim start-kafka.sh

输入以下内容

cat /export/servers/onekey/kafka/slave | while read linedo{echo "开始启动 --> "$linessh $line "source /etc/profile;nohup sh ${KAFKAHOME}/bin/kafka-server-start.sh -daemon ${KAFKAHOME}/config/server.properties >/dev/null 2>&1 &"}&waitdoneecho "★★★启动完成★★★"

保存退出

chmod +x start-kafka.sh

vim stop-kafka.sh

输入以下内容

cat /export/servers/onekey/kafka/slave | while read linedo{echo "开始中止 --> "$linessh $line "source /etc/profile;nohup sh ${KAFKA_HOME}/bin/kafka-server-stop.sh >/dev/null 2>&1 &"}&waitdoneecho "★★★中止完成★★★"

保存退出

chmod +x stop-kafka.sh

加入到环境变量中

export PATH=${ZK_ONEKEY}/kafka:$PATHsource /etc/profile~~~

第四步:经过kafka-manager管理工具查看集群信息。file

因而可知,kafka集群已经启动完成。

三、Kafka快速入门

对kafka的操做有2种方式,一种是经过命令行方式,一种是经过API方式。

3.一、经过命令行Kafka

Kafka在bin目录下提供了shell脚本文件,能够对Kafka进行操做,分别是:file经过命令行的方式,咱们将体验下kafka,以便咱们对kafka有进一步的认知。

3.1.一、topic的操做

3.1.1.一、建立topic

~~~shellkafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic my-kafka-topic

执行结果:

Created topic "my-kafka-topic".~~~

参数说明:

  • zookeeper:参数是必传参数,用于配置 Kafka 集群与 ZooKeeper 链接地址。至少写一个。
  • partitions:参数用于设置主题分区数,该配置为必传参数。
  • replication-factor:参数用来设置主题副本数 ,该配置也是必传参数。
  • topic:指定topic的名称。
3.1.1.二、查看topic列表

~~~shellkafka-topics.sh --list --zookeeper node01:2181

__consumer_offsetsmy-kafka-topic~~~

能够查看列表。

若是须要查看topic的详细信息,须要使用describe命令。

~~~shellkafka-topics.sh --describe --zookeeper node01:2181 --topic test-topic

若不指定topic,则查看全部topic的信息

kafka-topics.sh --describe --zookeeper node01:2181~~~

3.1.1.三、删除topic

经过kafka-topics.sh执行删除动做,须要在server.properties文件中配置 delete.topic.enable=true,该配置默认为 false。

不然执行该脚本并未真正删除主题 ,将该topic标记为删除状态 。

~~~shellkafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic

执行以下

[root@node01 config]# kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topicTopic my-kafka-topic is marked for deletion.Note: This will have no impact if delete.topic.enable is not set to true.

若是将delete.topic.enable=true

[root@node01 config]# kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic2Topic my-kafka-topic2 is marked for deletion.Note: This will have no impact if delete.topic.enable is not set to true.

说明:虽然设置后,删除时依然提示没有设置为true,实际上已经删除了。

~~~

3.1.二、生产者的操做

~~~shellkafka-console-producer.sh --broker-list node01:9092 --topic my-kafka-topic~~~

能够看到,已经向topic发送了消息。

3.1.三、消费者的操做

~~~shellkafka-console-consumer.sh --bootstrap-server node01:9092 --topic my-kafka-topic

经过以上命令,能够看到消费者能够接收生产者发送的消息

若是须要从头开始接收数据,须要添加--from-beginning参数

kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic my-kafka-topic~~~

file

3.二、经过Java Api操做Kafka

除了经过命令行的方式操做kafka外,还能够经过Java api的方式操做,这种方式将更加的经常使用。

3.2.一、建立工程

file

导入依赖:

~~~xml itcast-bigdata cn.itcast.bigdata 1.0.0-SNAPSHOT 4.0.0

<artifactId>itcast-bigdata-kafka</artifactId>复制代码
<dependencies>复制代码
<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>复制代码
</dependencies>复制代码
<build>
        <plugins>
            <!-- java编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
复制代码

~~~

3.2.二、topic的操做

因为主题的元数据信息是注册在 ZooKeeper 相 应节点之中,因此对主题的操做实质是对 ZooKeeper 中记录主题元数据信息相关路径的操做。 Kafka将对 ZooKeeper 的相关操做封装成一 个 ZkUtils 类 , 井封装了一个AdrninUtils 类调用 ZkClient 类的相关方法以实现对 Kafka 元数据 的操做,包括对主题、代理、消费者等相关元数据的操做。对主题操做的相关 API调用较简单, 相应操做都是经过调用 AdminUtils类的相应方法来完成的。

~~~javapackage cn.itcast.kafka;

import kafka.admin.AdminUtils;import kafka.utils.ZkUtils;import org.apache.kafka.common.security.JaasUtils;import org.junit.Test;

import java.util.Properties;

public class TestKafkaTopic {

@Test
    public void testCreateTopic() {
        ZkUtils zkUtils = null;
        try {
            //参数:zookeeper的地址,session超时时间,链接超时时间,是否启用zookeeper安全机制
            zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled());复制代码
String topicName = "my-kafka-topic-test1";
            if (!AdminUtils.topicExists(zkUtils, topicName)) {
                //参数:zkUtils,topic名称,partition数量,副本数量,参数,机架感知模式
                AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), AdminUtils.createTopic$default$6());
                System.out.println(topicName + " 建立成功!");
            } else {
                System.out.println(topicName + " 已存在!");
            }
        } finally {
            if (null != zkUtils) {
                zkUtils.close();
            }
        }复制代码
}
}复制代码

~~~

测试结果:

file

3.2.2.一、删除topic

~~~java@Testpublic void testDeleteTopic() {ZkUtils zkUtils = null;try {//参数:zookeeper的地址,session超时时间,链接超时时间,是否启用zookeeper安全机制zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled());String topicName = "my-kafka-topic-test1";if (AdminUtils.topicExists(zkUtils, topicName)) {//参数:zkUtils,topic名称AdminUtils.deleteTopic(zkUtils, topicName);System.out.println(topicName + " 删除成功!");} else {System.out.println(topicName + " 不已存在!");}} finally {if (null != zkUtils) {zkUtils.close();}}

}
~~~复制代码

测试结果:

file

3.2.三、生产者的操做

~~~javapackage cn.itcast.kafka;

import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;import org.junit.Test;

import java.util.Properties;

public class TestProducer {

@Test
    public void testProducer() throws InterruptedException {
        Properties config = new Properties();复制代码
// 设置kafka服务列表,多个用逗号分隔
        config.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092");
        // 设置序列化消息 Key 的类
        config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 设置序列化消息 value 的类
        config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());复制代码
// 初始化
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(config);
        for (int i = 0; i < 100 ; i++) {
            ProducerRecord record = new ProducerRecord("my-kafka-topic","data-" + i);
            // 发送消息
            kafkaProducer.send(record);
            System.out.println("发送消息 --> " + i);复制代码
Thread.sleep(100);
        }复制代码
kafkaProducer.close();复制代码
}复制代码

}

~~~

3.2.四、消费者的操做

~~~javapackage cn.itcast.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.common.serialization.StringSerializer;import org.junit.Test;

import javax.sound.midi.Soundbank;import java.util.Arrays;import java.util.Properties;

public class TestConsumer {

@Test
    public void testConsumer() {
        Properties config = new Properties();
        // 设置kafka服务列表,多个用逗号分隔
        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092");
        // 设置消费者分组id
        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        // 设置序反列化消息 Key 的类
        config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 设置序反列化消息 value 的类
        config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
复制代码
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(config);
        // 订阅topic
        kafkaConsumer.subscribe(Arrays.asList("my-kafka-topic"));复制代码
while (true) { // 使用死循环不断的拉取数据
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                String value = record.value();
                long offset = record.offset();
                System.out.println("value = " + value + ", offset = " + offset);
            }
        }复制代码
}
}
~~~

复制代码

什么是Kafka?Kafka监控工具汇总Kafka快速入门Kafka核心之ConsumerKafka核心之Producer

替代Flume——Kafka Connect简介最简单流处理引擎——Kafka Streams简介

更多实时计算,Flink,Kafka等相关技术博文,欢迎关注实时流式计算

file

相关文章
相关标签/搜索