Kafka是一个分布式消息系统,由LinkedIn使用Scala语言编写的,具备高水平扩展和高吞吐量 (每秒处理任务数)spring
目前流行的消息队列主要有三种:ActiveMQ、RabbitMQ、Kafkaapache
*其中ActiveMQ、RabbitMQ均支持AMQP协议,Kafka有其本身的协议 (仿AMQP,并不通用),但目前愈来愈多的开源分布式处理系统如Flume(日志收集系统)、Storm(实时数据处理系统)、Spark(大数据通用处理平台)、Elasticsearch(全文检索系统)都支持与Kafka的集成。bootstrap
*动态扩容:在不需中止服务的前提下动态的增长或减小消息队列服务器,Kafka的动态扩容是经过zookeeper实现的,zookeeper上保存着kafka的相关状态信息 (topic、partition等)缓存
1.网站活动追踪:用户的活动追踪、搜索、浏览、点击率等,将操做信息发布到不一样的主题中,可对数据实时监控,统计分析用户行为。安全
2.日志聚合:做为一种日志聚合的解决方案( 日志聚合的做用在于能够把来自不一样服务器上不一样应用程序产生的日志聚合起来,存放在单一的服务器上,方便进行搜索和分析)服务器
3.数据集中管理:分布式应用产生的数据统一存放在kafka集群中,集中式管理,供其余程序使用或后续对数据统计分析。session
AMQP(Advanced Message Queuing Protocol),高级消息队列协议是一个统一消息服务的应用层协议,为面向消息的中间件所设计,基于此协议的客户端与消息中间件可相互传递消息,并不受客户端和中间件的产品以及开发语言不一样所限制。app
生产者(Producer):往消息队列中发送消息的应用程序负载均衡
消费者(Consumer): 从消息队列中获取消息的应用程序异步
AMQP服务器(Broker):用来接收生产者发送的消息并将这些消息路由给服务器中的队列
*消息队列以broker为最小的运行单元,一个broker的运行就表明着一个Kafka应用程序实例。
*Kafka客户端支持的语言:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript,可使用任何一种语言和Kafka服务器进行通讯,编写本身的生产者与消费者客户端程序。
*broker中能够包含多个主题,每一个主题中又能够包含多个分区。
一个主题相似新闻中的体育、娱乐、教育等分类概念,在实际工程中一般一个业务一个主题。
一个topic中由一到多个分区组成,分区是Kafka结构的最小单元,一个分区就是一个FIFO(First In First Out)的队列,用于存放topic中的消息。
*Kafka的分区是提升Kafka性能的关键手段,当Kafka集群的性能不高时,能够试着往topic中添加分区。
、
*可见每一个分区都是一个先进先出的队列,producer往broker中的指定topic发送消息,消息将经过负载均衡策略进入到相应的partition中。
*每一个消息都有一个连续的序列号叫作offset(偏移量),是消息在分区中的惟一标识。
*每一个consumer都须要维护一个当前已消费消息的偏移量,相似于指针,随着consumer不断的读取消息,消费者的offset值也会不断的增长,consumer也能够以任意的顺序读取消息,只须要设置偏移量便可。
*每一个consumer读取的偏移量都会同步给Kafka,在Kafka集群中同时会维护各个consumer消费的偏移量(消息在分区中的偏移量是固定的,Consumer的偏移量是动态可变的,其至关于读取的指针)
*在一个可配置的时间段内,Kafka集群将保留全部发布的消息,无论这些消息是否被被消费。
*每一个分区在Kafka集群的若干服务中都有副本( 数量可配置 ),使Kafka具有了容错能力。
*在逻辑上相关的一组分区中,都由一个服务器做为leader,其他服务器做为follower,leader和follwer的选举是随机的,当follower接收到请求首先会发送给leader,由leader负责消息的读和写并将消息同步给各个follower,若是Leader所在节点宕机,followers中的一台则会自动成为leader。
Producer将消息发布到Broker的指定Topic中,消息将根据负载均衡策略进入到相应的Partition中。
Kafka中提供了Consumer组的概念,一个Consumer组中包含若干个Consumer,总体对外可当作是一个消费者。
若全部的消费者都在同一个consumer组中则成为队列模式,topic中各个分区的消息仅能被组中分区个的惟一consumer消费,组下的consumer共同竞争topic中的分区。
若全部的消费者都不在同一个consumer组中则成为发布-订阅模式,topic中各个分区的消息都会广播给全部的consumer组。
因为Kafka使用scala语言编写,scale语言运行在JVM中,所以须要先安装JDK而且配置好环境变量。
因为Kafka中的状态信息(topic、partition)都保存在zk上,虽然Kafka中自带zk,但通常是使用外置的zk集群,所以须要先安装zk服务而且配置好zk集群关系。
从Kafka官网中下载安装包并进行解压。
config是Kafka配置文件的存放目录
*因为多个Kafka服务(broker)都使用同一个zk集群,所以在同一个zk集群中的Kafka也就自动成为集群的关系,所以borker.id在同一个集群中不能重复。
*Kafka中的消息是缓存到本地磁盘的(log.dirs目录下),每一个topic的分区在broker的日志路径下都对应一个目录,目录下的.log文件用于存放分区中的消息,当有新消息进入分区时直接追加到文件中。
*若建立的topic其备份数大于1 (状态保存在zk) ,则Kafka集群中备份数个broker也会建立此topic,所以在其日志路径下也会存在此topic各个分区的目录。
*在使用Kafka提供的消费者脚本文件时能够指定其使用的配置文件。
*在程序中使用时须要手动设置配置项。
*在使用Kafka提供的生产者脚本文件时能够指定其使用的配置文件。
*在程序中使用时须要手动设置配置项。
逐一启动zk服务
逐一启动kafka服务并指定使用的配置文件( service.properties文件中配置使用外置的ZK集群)
建立名为chat的topic,其备份数为3而且每一个topic下存在3个分区 (因为启动了3个broker,所以topic的备份数最多只能是3)
建立topic时需指定zk服务地址,zk中保存了topic的相关属性(备份数和分区数),Kafka集群再从zk服务中获取topic属性信息并在Kafka集群中备份数个节点建立该topic。
查看各个broker中的日志目录,可见目录下都生成了chat-0、chat-一、chat2分别表示chat主题中的第一个、第二个、第三个分区,每一个分区中都有.log文件存放分区中的消息。
查看Kafka集群中chat主题下各个分区的状态
Topic:主题名称
PartitionCount:主题包含的分区数
ReplicationFactor:topic的备份数
Partition:分区号
Leader:充当leader的broker节点
Replicas:存在备份的broker节点(无论存活)
Isr:存在备份而且存活的broker节点
往Kafka集群中的chat主题发送消息
*消息将会根据负载均衡机制随机进入分区
*因为使用脚本文件启动消费者时没有指定使用的配置文件,因此三个消费者都不是同一个消费者组中,所以三个消费者都可以消费到chat主题中各个分区的消息。
*启动了三个消费者并指定使用的配置文件,默认的group.id是test-consumer-group,所以三个消费者都属于同一个消费者组中,topic中各个分区仅能被组下的惟一一个consumer消费。
*因为启动第一个消费者时,消费者组下只有一个消费者,所以消息都会被此消费者消费,当往消费者组中添加新的消费者而且生产者往主题添加消息时,此时消费者会从新竞争消息。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>0.11.0.1</version>
</dependency>
ZkUtils zkUtils = ZkUtils.apply("192.168.1.80:2181,192.168.1.81:2181,192.168.1.82:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled()); // 建立一个名为chat的主题其包含2个分区,备份数是3
AdminUtils.createTopic(zkUtils, "chat", 2, 3, new Properties(), RackAwareMode.Enforced$.MODULE$); zkUtils.close();
//建立Properties对象用于封装配置项
Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.80:9092,192.168.1.81:9092,192.168.1.82:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //建立KafkaProducer生产者实例
Producer<String, String> producer = new KafkaProducer<>(props); //建立生产端消息实体ProducerRecord并指定消息上传的topic名称、消息的Key、消息的Value(消息由Key Value两部分组成)
ProducerRecord<String,String> record = new ProducerRecord<String,String>("topic","key","value"); //发送消息
producer.send(record); //关闭链接
producer.close();
*KafkaProducer是线程安全的,在线程之间能够共享单个生产者实例。
*send()方法是异步的,一旦消息被保存在待发送缓冲区中此方法就当即返回,其返回Future<RecordMetadata>实例,当调用该实例的get()方法时将会阻塞直到服务器对请求进行应答(阻塞时长跟acks配置项有关),当服务器处理异常时将抛出异常。
*消息由Key、Value两部分组成,Key值能够重复,使用Kafka API往topic发送消息时,默认状况下将会根据消息的Key值进行散列来决定消息到达的分区。
*生产者的缓冲区保留还没有发送到服务器的消息,后台I/O线程负责将这些消息转换成请求发送到集群,若使用后不关闭生产者则会泄露这些资源。
//建立Properties对象用于封装配置项
Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.80:9092,192.168.1.81:9092,192.168.1.82:9092"); props.put("group.id", "consumerA"); //自动提交Consumer的偏移量给Kafka服务
props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //建立KafkaConsumer消费者实例
Consumer<String, String> consumer = new KafkaConsumer<>(props);
//订阅主题,一个消费者实例能够订阅多个主题
consumer.subscribe(Arrays.asList("chat", "hello")); //接收数据,消息存放在ConsumerRecords消息集合中
ConsumerRecords<String, String> records = consumer.poll(1000*5);
//遍历消费端消息集合获取ConsumerRecord消费端消息实体,一个消费端消息实体包含偏移量、消息Key值、消息Value值
for (ConsumerRecord<String, String> record : records){ System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.2.0.RELEASE</version> </dependency>
@Configuration @EnableKafka public class KafkaConsumerConfiguration { //Consumer配置 private Map<String, Object> consumerProps() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "bootKafka"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } //Consumer工厂Bean @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerProps()); } //Kafka监听器工厂Bean @Bean public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
@Configuration @EnableKafka public class KafkaProducerConfiguration { //Producer配置 private Map<String, Object> senderProps (){ Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.RETRIES_CONFIG, 1); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } //Producer工厂Bean @Bean public ProducerFactory<Integer, String> producerFactory() { return new DefaultKafkaProducerFactory<>(senderProps()); } //kafkaTemplate,用于向Topic发送消息 @Bean public KafkaTemplate<Integer, String> kafkaTemplate() { KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory()); return template; } }
@Component public class Producer { @Autowired private KafkaTemplate kafkaTemplate; public void send() throws Exception { kafkaTemplate.send("fruit", "apple"); } }
@Component public class Consumer { @KafkaListener(topics = "fruit") public void listen(String msgData) { } }