是什么?有什么用?怎么用?java
1) Apache Kafka 是一个消息队列(生产者消费者模式)node
2) Apache Kafka 目标:构建企业中统一的、高通量的、低延时的消息平台。web
3) 大多的是消息队列(消息中间件)都是基于JMS标准实现的,Apache Kafka 相似于JMS的实现。算法
ActiveMQapache
1) 做为缓冲,来异构、解耦系统。json
用户注册须要完成多个步骤,每一个步骤执行都须要很长时间。表明用户等待时间是全部步骤的累计时间。bootstrap
为了减小用户等待的时间,使用并行执行执行,有多少个步骤,就开启多少个线程来执行。表明用户等待时间是全部步骤中耗时最长的那个步骤时间。api
有了新得问题:开启多线程执行每一个步骤,若是以一个步骤执行异常,或者严重超时,用户等待的时间就不可控了。服务器
经过消息队列来保证。多线程
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-11Vt9Zzu-1591777000393)(/1544783011567.png)]
Kafka Cluster:由多个服务器组成。每一个服务器单独的名字broker(掮客)。
Kafka Producer:生产者、负责生产数据。
Kafka consumer:消费者、负责消费数据。
Kafka Topic: 主题,一类消息的名称。存储数据时将一类数据存放在某个topic下,消费数据也是消费一类数据。
订单系统:建立一个topic,叫作order。
用户系统:建立一个topic,叫作user。
商品系统:建立一个topic,叫作product。
注意:Kafka的元数据都是存放在zookeeper中。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9jjkyEgQ-1591777000396)(/1546867191336.png)]
本机也要加
192.168.72.141 node01 192.168.72.142 node02 192.168.72.143 node03
1)安装jdk、安装zookeeper
2)安装目录
安装包存放的目录:/export/software
安装程序存放的目录:/export/servers
数据目录:/export/data
日志目录:/export/logs
mkdir -p /export/servers/ mkdir -p /export/software/ mkdir -p /export/data/ mkdir -p /export/logs/
3)安装用户
安装hadoop,会建立一个hadoop用户
安装kafka,建立一个kafka用户
或者 建立bigdata用户,用来安装全部的大数据软件。
本例:使用root用户
4)验证环境
a) jdk环境
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-c3GiFZDj-1591777000400)(/1544866443419.png)]
b) zookeeper环境
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YcSUbXxH-1591777000407)(/1544786940692.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1kO6e8hB-1591777000409)(/1544787035414.png)]
因为kafka是scala语言编写的,基于scala的多个版本,kafka发布了多个版本。
其中2.11是推荐版本。
tar -zxvf kafka_2.11-1.0.0.tgz -C /export/servers/ cd /export/servers/ mv kafka_2.11-1.0.0 kafka
1)删除以前的安装记录
2)解压文件
3)重命名
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NG6i6T5r-1591777000411)(/1544787057518.png)]
进入配置目录,查看server.properties文件
cat server.properties |grep -v “#”
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EQ7qc9Fc-1591777000412)(/1544787073410.png)]
经过以上命令,查看到了默认的配置文件,对默认的文件进行修改。
修改三个地方
vi server.properties
1) Borker.id
2) 数据存放的目录,注意目录若是不存在,须要新建下。
3) zookeeper的地址信息
#broker.id 标识了kafka集群中一个惟一broker。 broker.id=0 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 #存放生产者生产的数据 数据通常以topic的方式存放 log.dirs=/export/data/kafka num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 #zk的信息 zookeeper.connect=node01:2181,node02:2181,node03:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 #可选配置 delete.topic.enable=true host.name=192.168.72.141
将修改好的配置文件,分发到node02,node03上。
先在node0二、node03上删除以往的安装记录
# 建立一个数据存放目录 /export/data/kafka mkdir -p /export/data/kafka
分发安装包
scp -r /export/servers/kafka/ node02:/export/servers/ scp -r /export/servers/kafka/ node03:/export/servers/
修改node02上的broker.id
vi /export/servers/kafka/config/server.properties
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Er16x018-1591777000414)(/1544787262549.png)]
修改node03上的broker.id
vi /export/servers/kafka/config/server.properties
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MKoOTjAm-1591777000415)(/1544787309119.png)]
cd /export/servers/kafka/bin nohup ./kafka-server-start.sh /export/servers/kafka/config/server.properties 2>&1 &
因为kafka集群并无UI界面能够查看。
须要借助外部工具,来查看kafka的集群
这个工具是一个java程序,必需要安装好JDK
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-M7qM7MdS-1591777000416)(/1544787326910.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zNjY7bmb-1591777000417)(/1544787370274.png)]
需求:订单系统,须要发送消息。 后面后3个程序须要接受这个消息,并作后续的处理。
1) 建立一个订单的topic。
./kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic order
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-R3M3am6C-1591777000419)(/1544787440943.png)]
2) 编写代码启动一个生产者,生产数据
./kafka-console-producer.sh --broker-list node01:9092 --topic order
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FwtPuN9h-1591777000421)(/1544787445692.png)]
3) 编写代码启动一个消费者,消费数据
./kafka-console-consumer.sh --zookeeper node01:2181 --from-beginning --topic order
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DNBFYJkT-1591777000422)(/1544787451882.png)]
1)java工程-maven,依赖。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.1</version> </dependency>
2) 编写代码-写生产者的代码
/** * 订单的生产者代码 */ public class OrderProducer { public static void main(String[] args) throws InterruptedException { /* 一、链接集群,经过配置文件的方式 * 二、发送数据-topic:order,value */ Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props); for (int i = 0; i < 1000; i++) { // 发送数据 ,须要一个producerRecord对象,最少参数 String topic, V value kafkaProducer.send(new ProducerRecord<String, String>("order", "订单信息!"+i)); Thread.sleep(100); } } }
3) 编写代码-写消费者的代码
/** * 消费订单数据--- javaben.tojson */ public class OrderConsumer { public static void main(String[] args) { // 一、链接集群 Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props); //二、发送数据 发送数据须要,订阅下要消费的topic。 order kafkaConsumer.subscribe(Arrays.asList("order")); while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);// jdk queue offer插入、poll获取元素。 blockingqueue put插入原生,take获取元素 for (ConsumerRecord<String, String> record : consumerRecords) { System.out.println("消费的数据为:" + record.value()); } } } }
若是本机没有修改hosts ,用命令的方式能够发送和接收数据,用java代码发送接收不了
bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 2 --partitions 3 --topic order
分片:solrcloud中有说起到。
当数据量很是大的时候,一个服务器存放不了,就将数据分红两个或者多个部分,存放在多台服务器上。每一个服务器上的数据,叫作一个分片。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-x8Tyhczg-1591777000423)(/1544789688464.png)]
副本:solrcloud中有说起到。
当数据只保存一份的时候,有丢失的风险。为了更好的容错和容灾,将数据拷贝几份,保存到不一样的机器上。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4q1FKuLn-1591777000424)(/1544789701790.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7M7VdPKn-1591777000425)(/1544795613584.png)]
segment段中有两个核心的文件一个是log,一个是index。 当log文件等于1G时,新的会写入到下一个segment中。
经过下图中的数据,能够看到一个segment段差很少会存储70万条数据。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JzIFV44F-1591777000427)(/1544795630398.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yMZBa8RZ-1591777000428)(/1544795642211.png)]
kafka在数据生产的时候,有一个数据分发策略。默认的状况使用DefaultPartitioner.class类。
这个类中就定义数据分发的策略。
1) 若是是用户制定了partition,生产就不会调用DefaultPartitioner.partition()方法
数据分发策略的时候,能够指定数据发往哪一个partition。
当ProducerRecord 的构造参数中有partition的时候,就能够发送到对应partition上
/** * Creates a record to be sent to a specified topic and partition * * @param topic The topic the record will be appended to * @param partition The partition to which the record should be sent * @param key The key that will be included in the record * @param value The record contents */ public ProducerRecord(String topic, Integer partition, K key, V value) { this(topic, partition, null, key, value, null); }
2) 当用户指定key,使用hash算法。若是key一直不变,同一个key算出来的hash值是个固定值。若是是固定值,这种hash取模就没有意义。
Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions
若是生产者没有指定partition,可是发送消息中有key,就key的hash值。
/** * Create a record to be sent to Kafka * * @param topic The topic the record will be appended to * @param key The key that will be included in the record * @param value The record contents */ public ProducerRecord(String topic, K key, V value) { this(topic, null, null, key, value, null); }
3) 当用既没有指定partition也没有key。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JegZyeLy-1591777000429)(/1544795702853.png)]
既没有指定partition,也没有key的状况下如何发送数据。
使用轮询的方式发送数据。
/** * Create a record with no key * * @param topic The topic this record should be sent to * @param value The record contents */ public ProducerRecord(String topic, V value) { this(topic, null, null, null, value, null); }
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wJDlqlbk-1591777000430)(/1544795787490.png)]
一个partition只能被一个组中的成员消费。
因此若是消费组中有多于partition数量的消费者,那么必定会有消费者没法消费数据。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qkDI4do7-1591777000431)(/1544789762719.png)]
1) 消息生产分为同步模式和异步模式
2) 消息确认分为三个状态
a) 0:生产者只负责发送数据
b) 1:某个partition的leader收到数据给出响应
c) -1:某个partition的全部副本都收到数据后给出响应
3) 在同步模式下
a) 生产者等待10S,若是broker没有给出ack响应,就认为失败。
b) 生产者重试3次,若是尚未响应,就报错。
4) 在异步模式下
a) 先将数据保存在生产者端的buffer中。Buffer大小是2万条。
b) 知足数据阈值或者数量阈值其中的一个条件就能够发送数据。
c) 发送一批数据的大小是500条。
若是broker迟迟不给ack,而buffer又满了。
开发者能够设置是否直接清空buffer中的数据。
broker端的消息不丢失,其实就是用partition副本机制来保证。
Producer ack -1. 可以保证全部的副本都同步好了数据。其中一台机器挂了,并不影像数据的完整性。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3pKJOTVq-1591777000432)(/1544795493081.png)]
只要记录offset值,消费者端不会存在消息丢失的可能。只会重复消费。
-1:某个partition的全部副本都收到数据后给出响应
3) 在同步模式下
a) 生产者等待10S,若是broker没有给出ack响应,就认为失败。
b) 生产者重试3次,若是尚未响应,就报错。
4) 在异步模式下
a) 先将数据保存在生产者端的buffer中。Buffer大小是2万条。
b) 知足数据阈值或者数量阈值其中的一个条件就能够发送数据。
c) 发送一批数据的大小是500条。
若是broker迟迟不给ack,而buffer又满了。
开发者能够设置是否直接清空buffer中的数据。
broker端的消息不丢失,其实就是用partition副本机制来保证。
Producer ack -1. 可以保证全部的副本都同步好了数据。其中一台机器挂了,并不影像数据的完整性。
[外链图片转存中…(img-3pKJOTVq-1591777000432)]
只要记录offset值,消费者端不会存在消息丢失的可能。只会重复消费。