1、搭建Kafka环境一、下载解压-- 下载wget http://mirror.bit.edu.cn/apac..._2.11-2.2.0.tgz-- 解压tar -zxvf kafka_2.11-2.2.0.tgz-- 重命名mv kafka_2.11-2.2.0 kafka2.11二、启动Kafka服务git
kafka依赖ZooKeeper服务,须要本地安装并启动ZooKeeper。github
参考文章: Linux系统搭建ZooKeeper3.4中间件,经常使用命令总结
-- 执行位置-- /usr/local/mysoft/kafka2.11bin/kafka-server-start.shconfig/server.properties三、查看服务ps -aux|grep kafka四、开放地址端口-- 基础路径-- /usr/local/mysoft/kafka2.11/configvim server.properties-- 添加下面注释advertised.listeners=PLAINTEXT://192.168.72.130:90922、Kafka基础概念一、基础描述web
Kafka是由Apache开源,具备分布式、分区的、多副本的、多订阅者,基于Zookeeper协调的分布式处理平台,由Scala和Java语言编写。一般用来搜集用户在应用服务中产生的动做日志数据,并高速的处理。日志类的数据须要高吞吐量的性能要求,对于像Hadoop同样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是经过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了经过集群来提供实时的消息。spring
二、功能特色apache
(1)、经过磁盘数据结构提供消息的持久化,消息存储也可以保持长时间稳定性;bootstrap
(2)、高吞吐量,即便是很是普通的硬件Kafka也能够支持每秒超高的并发量;vim
(3)、支持经过Kafka服务器和消费机集群来分区消息;服务器
(4)、支持Hadoop并行数据加载;数据结构
(5)、API包封装的很是好,简单易用,上手快 ;并发
(6)、分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer;
三、消息功能
点对点模型一般是一个基于拉取或者轮询的消息传递模型,消费者主动拉取数据,消息收到后从队列移除消息,这种模型不是将消息推送到客户端,而是从队列中请求消息。特色是发送到队列的消息被一个且只有一个消费者接收处理,即便有多个消费者监听队列也是如此。
发布订阅模型则是一个基于推送的消息传送模型,消息产生后,推送给全部订阅者。发布订阅模型能够有多种不一样的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的全部消息,即便当前订阅者不可用,处于离线状态。
四、消息队列做用
五、专业术语简介
一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker能够容纳多个topic。
消息生产者,就是向kafka broker发消息的客户端。
消息消费者,向kafka broker取消息的客户端。
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,能够理解为一个队列。
每一个Consumer属于一个特定的Consumer Group,可为每一个Consumer指定group name,若不指定group name则属于默认的分组。
一个庞大大的topic能够分布到多个broker上,一个topic能够分为多个partition,每一个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的总体的顺序。Partition是物理上的概念,方便在集群中扩展,提升并发。
3、整合SpringBoot2框架一、案例结构
二、基础依赖<!-- SpringBoot依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafka 依赖 --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.4.RELEASE</version></dependency>三、生产者配置spring: kafka: bootstrap-servers: 127.0.0.1:9092四、消息生成@RestControllerpublicclass ProducerWeb { @ResourceprivateKafkaTemplate<String, String> kafkaTemplate; @RequestMapping("/send") public String sendMsg () { MsgLog msgLog =newMsgLog(1,"消息生成", 1,"消息日志",newDate()) ; String msg = JSON.toJSONString(msgLog) ; // 这里Topic若是不存在,会自动建立 kafkaTemplate.send("cicada-topic", msg); returnmsg ; }}五、消费者配置spring: kafka: bootstrap-servers: 127.0.0.1:9092 consumer: group-id: test-consumer-group六、消息消费@Componentpublicclass ConsumerMsg { privatestaticLogger LOGGER = LoggerFactory.getLogger(ConsumerMsg.class); @KafkaListener(topics ="cicada-topic") public void listenMsg (ConsumerRecord<?,String> record) { String value = record.value(); LOGGER.info("ConsumerMsg====>>"+value); }}4、消息流程分析一、生产者分析
生产者基于推push推模式将消息发布到broker,每条消息都被追加到分区patition中,属于磁盘顺序写,效率比随机写内存要高,保障kafka高吞吐量。
消息发送时都被发送到一个topic,而topic是由Partition Logs(分区日志)组成,其组织结构以下图所示:
每一个Partition中的消息都是有序的,生产的消息被不断追加到Partitionlog上,其中的每个消息都被赋予了一个惟一的offset值。每一个Partition能够经过调整以适配它所在的机器,而一个topic又能够有多个Partition组成,所以整个集群就能够适应任意大小的数据。分区的原则:指定patition,则直接使用;未指定patition但指定key,经过对key的value进行hash出一个patition;patition和key都未指定,使用轮询选出一个patition。
二、消费者分析
消费者是以consumer group消费者组的方式工做,由一个或者多个消费者组成一个组,共同消费一个topic。每一个分区在同一时间只能由group中的一个消费者读取,可是多个group能够同时消费一个partition。
消费者采用pull拉模式从broker中读取数据。对于Kafka而言,pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer能够本身控制消费方式——便可批量消费也可逐条消费,同时还能选择不一样的提交方式从而实现不一样的数据传输场景。
5、源代码地址GitHub·地址https://github.com/cicadasmil...·地址https://gitee.com/cicadasmile...