根据官网的介绍,ApacheKafka®是一个分布式流媒体平台,它主要有3种功能:html
Kafka目前主要做为一个分布式的发布订阅式的消息系统使用 下图为消息传输流程java
在kafka官网 http://kafka.apache.org/downloads下载到最新的kafka安装包,选择下载二进制版本的tgz文件spring
kafka解压目录下下有一个config的文件夹,里面放置的是咱们的配置文件apache
consumer.properites 消费者配置json
producer.properties 生产者配置bootstrap
server.properties kafka服务器的配置,此配置文件用来配置kafka服务器 目前仅介绍几个最基础的配置服务器
listeners=PLAINTEXT:// 192.168.180.128:9092
。并确保服务器的9092端口可以访问zookeeper.connect=localhost:2181
启动zookeeperapp
#前台启动 [root@CentOS124 home]# cd kafka2.11/ [root@CentOS124 kafka2.11]# bin/zookeeper-server-start.sh config/zookeeper.properties #后台启动 [root@CentOS124 kafka2.11]# bin/zookeeper-server-start.sh config/zookeeper.properties 1>/dev/null 2>&1 & [1] 18466 #查看是否启动成功 [root@CentOS124 ~]# ps -ef|grep kafka
启动kafka框架
[root@CentOS124 kafka2.11]# bin/kafka-server-start.sh config/server.properties #后台启动 [root@CentOS124 kafka2.11]# bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 & #建立 topic [root@CentOS124 kafka2.11]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test #查看Kafka 中的 topic 列表 bin/kafka-topics.sh --list --zookeeper localhost:2181
#复制server.properties配置文件为三份,分别起名为server.properties,server-2.properties,server-3.properties 三份配置中都要修改如下 #三个配置中分别修改成0,2,3 broker.id=0 #三个配置中分别修改成9092,9093,9094 port=9092 #kafka-logs,kafka-logs-2,kafka-logs-3 log.dirs=/tmp/kafka-logs #都设置为3,即每一个topic默认三个partition num.partitions=3 #zookeeper集群地址,外部能够配置,这里环境有限 使用默认既可 zookeeper.connect=localhost:2181 #分别进入kafka目录下 执行以下命令启动服务控制台输出日子完成了 bin/kafka-server-start.sh config/server.properties bin/kafka-server-start.sh config/server-2.properties bin/kafka-server-start.sh config/server-3.properties
首先建立一个springBoot项目 引入spring-kafka 分布式
application.properties 配置
server.port=8080 #kafka地址 brokers集群地址用,隔开 spring.kafka.bootstrap-servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 #生产者的配置,大部分咱们可使用默认的,这里列出几个比较重要的属性 #每批次发送消息的数量 spring.kafka.producer.batch-size=16 #发送失败重试次数 spring.kafka.producer.retries=0 #即32MB的批处理缓冲区 spring.kafka.producer.buffer-memory=33554432 #key序列化方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #消费者的配置 ##Kafka中没有初始偏移或若是当前偏移在服务器上再也不存在时,默认区最新 ,有三个选项 【latest, earliest, none】 spring.kafka.consumer.auto-offset-reset=latest #是否开启自动提交 spring.kafka.consumer.enable-auto-commit=true #自动提交的时间间隔 spring.kafka.consumer.auto-commit-interval=100 #key的解码方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer #value的解码方式 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer #在kafka/config文件的consumer.properties中有配置 spring.kafka.consumer.group-id=test-consumer-group
建立Producer生产者
package com.example.modules; import com.alibaba.fastjson.JSONObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import java.util.Date; /** * 〈生产者〉 * @author qinxuewu * @create 18/8/4下午11:56 * @since 1.0.0 */ @Component public class Producer { @Autowired private KafkaTemplate kafkaTemplate; //发送消息方法 public void send() { JSONObject obj=new JSONObject(); obj.put("id",System.currentTimeMillis()); obj.put("name","生产者发送消息"); obj.put("date",new Date()); //这个 topic 在 Java 程序中是不须要提早在 Kafka 中设置的,由于它会在发送的时候自动建立你设置的 topic kafkaTemplate.send("qxw",obj.toString()); } }
建立消费者
@Component public class Consumer { private static final Logger logger = LoggerFactory.getLogger(Consumer.class); /** * 同时监听两个 topic 的消息了,可同时监听多个topic * @param record * @throws Exception */ @KafkaListener(topics = {"test","qxw"}) public void listen (ConsumerRecord<?, ?> record) throws Exception { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); logger.info("消费者开始消费message:" + message); } } }
运行后就能够看到控制台输出了
@RunWith(SpringRunner.class) @SpringBootTest public class KafkaDemoApplicationTests { @Autowired private Producer producer; @Test public void contextLoads() { for (int i = 0; i <3 ; i++) { producer.send(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
https://www.cnblogs.com/alan319/p/8651434.html kafka的配置分为 broker、producter、consumer三个不一样的配置