kafka是一个分布式消息队列。具备高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。通常在架构设计中起到解耦、削峰、异步处理的做用。html
wget http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka_2.11-2.2.0.tgz
复制代码
tar -zxvf kafka_2.11-2.2.0.tgz
复制代码
修改 kafka-server 的配置文件java
cd kafka_2.11-2.2.0
vim config/server.properties
复制代码
修改其中的:node
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# A comma separated list of directories under which to store log files
log.dirs=/data/kafka-logs
复制代码
bin/zookeeper-server-start.sh config/zookeeper.properties
复制代码
使用 kafka-server-start.sh
启动 kafka 服务:web
bin/kafka-server-start.sh config/server.properties
复制代码
使用 kafka-topics.sh
建立单分区单副本的 topic demo
:算法
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo
复制代码
查看 topic 列表:spring
bin/kafka-topics.sh --list --zookeeper localhost:2181
复制代码
使用 kafka-console-producer.sh
发送消息:apache
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo
复制代码
使用 kafka-console-consumer.sh
接收消息并在终端打印:bootstrap
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning
复制代码
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
,高版本已经不支持
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic demo
复制代码
[root@localhost kafka_2.11-2.2.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic demo
Topic:demo PartitionCount:1 ReplicationFactor:1 Configs:
Topic: demo Partition: 0 Leader: 1 Replicas: 1 Isr: 1
复制代码
第一行给出了全部分区的摘要,每一个附加行给出了关于一个分区的信息。 因为咱们只有一个分区,因此只有一行。vim
Kafka 支持两种模式的集群搭建:能够在单机上运行多个 broker 实例来实现集群,也可在多台机器上搭建集群,下面介绍下如何实现单机多 broker 实例集群,其实很简单,只须要以下配置便可。安全
利用单节点部署多个 broker。 不一样的 broker 设置不一样的 id,监听端口及日志目录。 例如:
cp config/server.properties config/server-2.properties
vi config/server-2.properties
复制代码
修改内容:
broker.id=2
listeners = PLAINTEXT://127.0.0.1:9093
log.dirs=/data/kafka-logs2
复制代码
一样,配置第三个broker:
cp config/server-2.properties config/server-3.properties
vi config/server-3.properties
复制代码
修改内容:
broker.id=2
listeners = PLAINTEXT://127.0.0.1:9093
log.dirs=/data/kafka-logs2
复制代码
listeners 申明此kafka服务器须要监听的端口号,默认会使用localhost的地址,若是是在远程服务器上运行则必须配置,例如: listeners=PLAINTEXT:// 192.168.180.128:9092 并确保服务器的9092端口可以访问
bin/kafka-server-start.sh config/server-2.properties &
bin/kafka-server-start.sh config/server-3.properties &
复制代码
至此,单机多broker实例的集群配置完毕。
分别在多个节点按上述方式安装 Kafka,配置启动多个 Zookeeper 实例。
假设三台机器 IP 地址是 : 192.168.153.135, 192.168.153.136, 192.168.153.137
分别配置多个机器上的 Kafka 服务,设置不一样的 broker id,zookeeper.connect 设置以下:
config/server.properties里面的 zookeeper.connect
zookeeper.connect=192.168.153.135:2181,192.168.153.136:2181,192.168.153.137:2181
复制代码
从控制台写入数据并将其写回控制台是一个方便的起点,但您可能想要使用其余来源的数据或将数据从 Kafka 导出到其余系统。对于许多系统,您可使用 Kafka Connect 来导入或导出数据,而没必要编写自定义集成代码。
Kafka Connect 是 Kafka 包含的一个工具,能够将数据导入和导出到 Kafka。它是一个可扩展的工具,运行 链接器,实现与外部系统交互的自定义逻辑。在这个快速入门中,咱们将看到如何使用简单的链接器运行 Kafka Connect,这些链接器将数据从文件导入到 Kafka topic,并将数据从 Kafka topic 导出到文件。
参考:
http://www.54tianzhisheng.cn/2018/01/04/Kafka/
复制代码
http://kafka.apache.org/10/documentation/streams/quickstart
复制代码
http://kafka.apache.org/20/documentation.html#quickstart
复制代码
cp config/server.properties config/server-idea.properties
vi config/server-idea.properties
broker.id=999
listeners = PLAINTEXT://192.168.1.177:9999
log.dirs=/data/kafka-logs-999
复制代码
192.168.1.177为kafka所在机器的ip地址,9999端口号是对外提供的端口,下文会使用到
很简单的一个小demo,能够直接拷贝使用。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args);
KafkaTemplate kafkaTemplate = context.getBean(KafkaTemplate.class);
for (int i = 0; i < 10; i++) {
//调用消息发送类中的消息发送方法
kafkaTemplate.send("mytopic", System.currentTimeMillis() + "发送" + i);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@KafkaListener(topics = {"mytopic"},groupId = "halburt-demo2")
public void consumer1(String message) {
System.out.println("consumer1收到消息:" + message);
}
@KafkaListener(topics = {"mytopic"} ,groupId = "halburt-demo")
public void consumer2(ConsumerRecord<?, ?> record) {
System.out.println("consumer2收到消息");
System.out.println(" topic" + record.topic());
System.out.println(" key:" + record.key());
System.out.println(" value:"+record.value());
}
}
复制代码
server:
port: 8090
spring:
kafka:
consumer:
auto-commit-interval: 100
bootstrap-servers: 192.168.1.177:9999
enable-auto-commit: true
group-id: halburt-demo
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 1
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
concurrency: 5
producer:
bootstrap-servers: 192.168.1.177:9999
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
复制代码
192.168.1.177:9999即为kafka的配置文件中配置
spring-boot.version:2.1.3.RELEASE spring-kafka.version:2.2.0.RELEASE
【此处有坑】此处依赖版本能够不用这2个版本,可是必定要注意springboot和kafka的版本对应
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.1.3.RELEASE</version>
</dependency>
</dependencies>
复制代码
bin/kafka-server-start.sh config/server-idea.properties &
复制代码
cd /home/hd/kafka_2.11-2.2.0/
bin/zookeeper-server-start.sh config/zookeeper.properties&
复制代码
kafka启动成功以后,run Application,会看到日志以下:
若是你是从头开始跟这个本文学习的,那么你直接启动的话,会发现消息发出去了,可是没有接收到。 我也是查了很久,看了不少教程,别人都行我就不行。 若是你的zk有其余的topic节点的话,会收不到消息,直接上解决方案:删除全部的zk节点。怎么删除?
/**
* zookeeper znode递归删除节点
* @author Halburt
*
*/
public class DeleteZkNode {
//zookeeper的地址
private static final String connectString = "192.168.1.177:2181";
private static final int sessionTimeout = 2000;
private static ZooKeeper zookeeper = null;
/**
* main函数
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
//调用rmr,删除全部目录
rmr("/");
}
/**
* 递归删除 由于zookeeper只容许删除叶子节点,若是要删除非叶子节点,只能使用递归
* @param path
* @throws IOException
*/
public static void rmr(String path) throws Exception {
ZooKeeper zk = getZookeeper();
//获取路径下的节点
List<String> children = zk.getChildren(path, false);
for (String pathCd : children) {
//获取父节点下面的子节点路径
String newPath = "";
//递归调用,判断是不是根节点
if (path.equals("/")) {
newPath = "/" + pathCd;
} else {
newPath = path + "/" + pathCd;
}
rmr(newPath);
}
//删除节点,并过滤zookeeper节点和 /节点
if (path != null && !path.trim().startsWith("/zookeeper") && !path.trim().equals("/")) {
zk.delete(path, -1);
//打印删除的节点路径
System.out.println("被删除的节点为:" + path);
}
}
/**
* 获取Zookeeper实例
* @return
* @throws IOException
*/
public static ZooKeeper getZookeeper() throws IOException {
zookeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
return zookeeper;
}
}
复制代码
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.177:9999 --topic mytopic --from-beginning
复制代码
Kafka Tool 2是一款Kafka的可视化客户端工具,能够很是方便的查看Topic的队列信息以及消费者信息以及kafka节点信息。下载地址:www.kafkatool.com/download.ht…
下载安装以后会弹出一个配置链接的窗口,咱们能够看到这个窗口左上角为Add Cluster(添加集群),但不要紧,对应单节点的Kafka实例来讲也是能够的,由于这个软件监控的是Zookeeper而不是Kafka,Kafka的集群搭建也是依赖Zookeeper来实现的,因此默认状况下咱们都是直接经过Zookeeper去完成大部分操做。
咱们能够看到已经建立好的Topic。这个软件默认显示数据的类型为Byte,能够在设置里面找到对应的修改选项
kafka对外使用topic的概念,生产者往topic里写消息,消费者从topic读消息。为了作到水平扩展,一个topic实际是由多个partition组成的,遇到瓶颈时,能够经过增长partition的数量来进行横向扩容。单个parition内是保证消息有序。每新写一条消息,kafka就是在对应的文件append写,因此性能很是高。kafka的整体数据流是这样的:
Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉去指定Topic的消息,而后进行业务处理。
消费者: Producer将消息发布到指定的Topic中,同时Producer也能决定将此消息归属于哪一个partition;好比基于"round-robin"方式或者经过其余的一些算法等.
每一个consumer属于一个consumer group;反过来讲,每一个group中能够有多个consumer.发送到Topic的消息,只会被订阅此Topic的每一个group中的一个consumer消费(对于一条消息来讲,同一组的消费者只会有一个消费者去消费).
若是全部的consumer都具备相同的group,这种状况和queue模式很像;消息将会在consumers之间负载均衡. 若是全部的consumer都具备不一样的group,那这就是"发布-订阅";消息将会广播给全部的消费者.
在kafka中,一个partition中的消息只会被group中的一个consumer消费;每一个group中consumer消息消费互相独立;咱们能够认为一个group是一个"订阅"者,一个Topic中的每一个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer能够消费多个partitions中的消息.kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的。事实上,从Topic角度来讲,消息仍不是有序的。
一个Topic能够认为是一类消息,每一个topic将被分红多个partition(区),每一个partition在存储层面是append log文件。任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset为一个long型数字,它是惟一标记一条消息。它惟一的标记一条消息。kafka并无提供其余额外的索引机制来存储offset,由于在kafka中几乎不容许对消息进行“随机读写”。
topic物理上的分组,一个topic能够分为多个partition,每一个partition是一个有序的队列
如下是单个生产者和消费者从两个分区主题读取和写入的简单示例。
此图显示了一个producer向2个partition分区写入日志,以及消费者从相同日志中读取的内容。日志中的每条记录都有一个相关的条目号,称之为偏移量offset。消费者使用此偏移来记录其在partitiond读取日志的位置。
固然若是存在多个消费者的话,根据groupId分组,同一组的消费者不会重复读取日志。
换句话说:
订阅topic是以一个消费组来订阅的,一个消费组里面能够有多个消费者。同一个消费组中的两个消费者,不会同时消费一个partition。换句话来讲,就是一个partition,只能被消费组里的一个消费者消费,可是能够同时被多个消费组消费。所以,若是消费组内的消费者若是比partition多的话,那么就会有个别消费者一直空闲。
复制代码
其实consumer可使用任意顺序消费日志消息,它只须要将offset重置为任意值.(offset将会保存在zookeeper中,kafka集群几乎不须要维护任何consumer和producer状态信息,这些信息有zookeeper保存)
partition有多个.最根本缘由是kafka基于文件存储.经过分区,能够将日志内容分散到多个partition上,来避免文件大小达到单机磁盘的上限,每一个partiton都会被当前server(kafka实例)保存;能够将一个topic切分多任意多个partitions,来消息保存/消费的效率.此外越多的partitions意味着能够容纳更多的consumer,有效提高并发消费的能力.
对于一些常规的消息系统,kafka是个不错的选择;partitons/replication和容错,可使kafka具备良好的扩展性和性能优点.不过到目前为止,咱们应该很清楚认识到,kafka并无提供JMS中的"事务性""消息传输担保(消息确认机制)""消息分组"等企业级特性;kafka只能使用做为"常规"的消息系统,在必定程度上,还没有确保消息的发送与接收绝对可靠(好比,消息重发,消息发送丢失等)
kafka的特性决定它很是适合做为"日志收集中心";application能够将操做日志"批量""异步"的发送到kafka集群中,而不是保存在本地或者DB中;kafka能够批量提交消息/压缩消息等,这对producer端而言,几乎感受不到性能的开支.此时consumer端可使hadoop等其余系统化的存储和分析系统.
能够将网页/用户操做等信息发送到kafka中.并实时监控,或者离线统计分析等
等等其余场景
############################# Server Basics #############################
# 节点的ID,必须与其它节点不一样
broker.id=0
# 选择启用删除主题功能,默认false
#delete.topic.enable=true
############################# Socket Server Settings #############################
# 套接字服务器坚挺的地址。若是没有配置,就使用java.net.InetAddress.getCanonicalHostName()的返回值
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
# 节点的主机名会通知给生产者和消费者。若是没有设置,若是配置了"listeners"就使用"listeners"的值。
# 不然就使用java.net.InetAddress.getCanonicalHostName()的返回值
#advertised.listeners=PLAINTEXT://your.host.name:9092
# 将侦听器的名称映射到安全协议,默认状况下它们是相同的。有关详细信息,请参阅配置文档
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# 服务器用来接受请求或者发送响应的线程数
num.network.threads=3
# 服务器用来处理请求的线程数,可能包括磁盘IO
num.io.threads=8
# 套接字服务器使用的发送缓冲区大小
socket.send.buffer.bytes=102400
# 套接字服务器使用的接收缓冲区大小
socket.receive.buffer.bytes=102400
# 单个请求最大能接收的数据量
socket.request.max.bytes=104857600
############################# Log Basics #############################
# 一个逗号分隔的目录列表,用来存储日志文件
log.dirs=/tmp/kafka-logs
# 每一个主题的日志分区的默认数量。更多的分区容许更大的并行操做,可是它会致使节点产生更多的文件
num.partitions=1
# 每一个数据目录中的线程数,用于在启动时日志恢复,并在关闭时刷新。
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# 内部主题设置
# 对于除了开发测试以外的其余任何东西,group元数据内部主题的复制因子“__consumer_offsets”和“__transaction_state”,建议值大于1,以确保可用性(如3)。
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# 在强制刷新数据到磁盘以前容许接收消息的数量
#log.flush.interval.messages=10000
# 在强制刷新以前,消息能够在日志中停留的最长时间
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# 如下的配置控制了日志段的处理。策略能够配置为每隔一段时间删除片断或者到达必定大小以后。
# 当知足这些条件时,将会删除一个片断。删除老是发生在日志的末尾。
# 一个日志的最小存活时间,能够被删除
log.retention.hours=168
# 一个基于大小的日志保留策略。段将被从日志中删除只要剩下的部分段不低于log.retention.bytes。
#log.retention.bytes=1073741824
# 每个日志段大小的最大值。当到达这个大小时,会生成一个新的片断。
log.segment.bytes=1073741824
# 检查日志段的时间间隔,看是否能够根据保留策略删除它们
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=localhost:2181
# 链接到Zookeeper的超时时间
zookeeper.connection.timeout.ms=6000
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
复制代码
若有表述不当之处,敬请指正。