Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统1、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,这使它做为企业级基础设施来处理流式数据很是有价值。此外,Kafka能够经过Kafka Connect链接到外部系统(用于数据输入/输出),并提供了Kafka Streams——一个Java流式处理库。该设计受事务日志的影响较大。java
Kafka是一个分布式数据流平台,能够运行在单台服务器上,也能够在多台服务器上部署造成集群。它提供了发布和订阅功能,使用者能够发送数据到Kafka中,也能够从Kafka中读取数据(以便进行后续的处理)。Kafka具备高吞吐、低延迟、高容错等特色。下面介绍一下Kafka中经常使用的基本概念:node
Broker 消息队列中经常使用的概念,在Kafka中指部署了Kafka实例的服务器节点。linux
Topic 用来区分不一样类型信息的主题。好比应用程序A订阅了主题t1,应用程序B订阅了主题t2而没有订阅t1,那么发送到主题t1中的数据将只能被应用程序A读到,而不会被应用程序B读到。git
Partition 每一个topic能够有一个或多个partition(分区)。分区是在物理层面上的,不一样的分区对应着不一样的数据文件。Kafka使用分区支持物理上的并发写入和读取,从而大大提升了吞吐量。github
Record 实际写入Kafka中并能够被读取的消息记录。每一个record包含了key、value和timestamp。web
Producer 生产者,用来向Kafka中发送数据(record)。spring
Consumer 消费者,用来读取Kafka中的数据(record)。apache
Consumer Group 一个消费者组能够包含一个或多个消费者。使用多分区+多消费者方式能够极大提升数据下游的处理速度。bootstrap
Topic(主题): 每一条发送到kafka集群的消息均可以有一个类别,这个类别叫作topic,不一样的消息会进行分开存储,若是topic很大,能够分布到多个broker上,也能够这样理解:topic被认为是一个队列,每一条消息都必须指定它的topic,能够说咱们须要明确把消息放入哪个队列。对于传统的message queue而言,通常会删除已经被消费的消息,而Kafka集群会保留全部的消息,不管其被消费与否。固然,由于磁盘限制,不可能永久保留全部数据(实际上也不必),所以Kafka提供两种策略删除旧数据。一是基于时间,二是基于Partition文件大小。vim
Broker(代理): 一台kafka服务器就能够称之为broker.一个集群由多个broker组成,一个broker能够有多个topic
Partition(分区): 为了使得kafka吞吐量线性提升,物理上把topic分红一个或者多个分区,每个分区是一个有序的队列。且每个分区在物理上都对应着一个文件夹,该文件夹下存储这个分区全部消息和索引文件。 分区的表示: topic名字-分区的id每一个日志文件都是一个Log Entry序列,每一个Log Entry包含一个4字节整型数值(值为M+5),1个字节的"magic value",4个字节的CRC校验码,而后跟M个字节的消息这个log entries并不是由一个文件构成,而是分红多个segment,每一个segment以该segment第一条消息的offset命名并以“.kafka”为后缀。另外会有一个索引文件,它标明了每一个segment下包含的log entry的offset范围分区中每条消息都有一个当前Partition下惟一的64字节的offset,它指明了这条消息的起始位置,Kafka只保证一个分区的数据顺序发送给消费者,而不保证整个topic里多个分区之间的顺序
Replicas(副本): 试想:一旦某一个Broker宕机,则其上全部的Partition数据都不可被消费,因此须要对分区备份。其中一个宕机后其它Replica必需要能继续服务而且即不能形成数据重复也不能形成数据丢失。 若是没有一个Leader,全部Replica均可同时读/写数据,那就须要保证多个Replica之间互相(N×N条通路)同步数据,数据的一致性和有序性很是难保证,大大增长了Replication实现的复杂性,同时也增长了出现异常的概率。而引入Leader后,只有Leader负责数据读写,Follower只向Leader顺序Fetch数据(N条通路),系统更加简单且高效。 每个分区,根据复制因子N,会有N个副本,好比在broker1上有一个topic,分区为topic-1, 复制因子为2,那么在两个broker的数据目录里,就都有一个topic-1,其中一个是leader,一个replicas同一个Partition可能会有多个Replica,而这时须要在这些Replication之间选出一个Leader,Producer和Consumer只与这个Leader交互,其它Replica做为Follower从Leader中复制数据
Producer: Producer将消息发布到指定的topic中,同时,producer还须要指定该消息属于哪一个partition
Consumer: 本质上kafka只支持topic,每个consumer属于一个consumer group,每一个consumer group能够包含多个consumer。发送到topic的消息只会被订阅该topic的每一个group中的一个consumer消费。若是全部的consumer都具备相同的group,这种状况和queue很类似,消息将会在consumer之间均衡分配;若是全部的consumer都在不一样的group中,这种状况就是广播模式,消息会被发送到全部订阅该topic的group中,那么全部的consumer都会消费到该消息。kafka的设计原理决定,对于同一个topic,同一个group中consumer的数量不能多于partition的数量,不然就会有consumer没法获取到消息。
Offset: Offset专指Partition以及User Group而言,记录某个user group在某个partiton中当前已经消费到达的位置。
目前主流使用场景基本以下:
消息队列(MQ) 在系统架构设计中,常常会使用消息队列(Message Queue)——MQ。MQ是一种跨进程的通讯机制,用于上下游的消息传递,使用MQ可使上下游解耦,消息发送上游只须要依赖MQ,逻辑上和物理上都不须要依赖其余下游服务。MQ的常见使用场景如流量削峰、数据驱动的任务依赖等等。在MQ领域,除了Kafka外还有传统的消息队列如ActiveMQ和RabbitMQ等。
追踪网站活动 Kafka最出就是被设计用来进行网站活动(好比PV、UV、搜索记录等)的追踪。能够将不一样的活动放入不一样的主题,供后续的实时计算、实时监控等程序使用,也能够将数据导入到数据仓库中进行后续的离线处理和生成报表等。
Metrics Kafka常常被用来传输监控数据。主要用来聚合分布式应用程序的统计数据,将数据集中后进行统一的分析和展现等。
日志聚合 不少人使用Kafka做为日志聚合的解决方案。日志聚合一般指将不一样服务器上的日志收集起来并放入一个日志中心,好比一台文件服务器或者HDFS中的一个目录,供后续进行分析处理。相比于Flume和Scribe等日志聚合工具,Kafka具备更出色的性能。
因为kafka依赖zookeeper环境因此先安装zookeeper,zk安装
安装环境
linux: CentSO-7.5_x64
java: jdk1.8.0_191
zookeeper: zookeeper3.4.10
kafka: kafka_2.11-2.0.1
复制代码
# 下载
$ wget http://mirrors.hust.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
# 解压
$ tar -zxvf kafka_2.11-2.1.0.tgz
# 编辑配置文件修改一下几个配置
$ vim $KAFKA_HOME/config/server.properties
# 每台服务器的broker.id都不能相同只能是数字
broker.id=1
# 修改成你的服务器的ip或主机名
advertised.listeners=PLAINTEXT://node-1:9092
# 设置zookeeper的链接端口,将下面的ip修改成你的IP称或主机名
zookeeper.connect=node-1:2181,node-2:2181,node-3:2181
复制代码
$ cd $KAFKA_HOME
# 分别在每一个节点启动kafka服务(-daemon表示在后台运行)
$ bin/kafka-server-start.sh -daemon config/server.properties
# 建立一个名词为 test-topic 的 Topic,partitions 表示分区数量为3 --replication-factor 表示副本数量为2
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic test-topic
# 查看topic
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
# 查看topic状态
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test-topic
# 查看topic详细信息
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test-topic
# 修改topic信息
$ bin/kafka-topics.sh --alter --topic test-topic --zookeeper localhost:2181 --partitions 5
# 删除topic(简单的删除,只是标记删除)
$ bin/kafka-topics.sh --delete --topic test-topic --zookeeper localhost:2181
# 在一台服务器上建立一个 producer (生产者)
$ bin/kafka-console-producer.sh --broker-list node-1:9092,node-2:9092,node-3:9092 --topic test-topic
# 在一台服务器上建立一个 consumer (消费者)
$ bin/kafka-console-consumer.sh --bootstrap-server node-2:9092,node-3:9092,node-4:9092 --topic test-topic --from-beginning
# 如今能够在生产者的控制台输入任意字符就能够看到消费者端有消费消息。
复制代码
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
复制代码
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
/**
* <p>
*
* @author leone
* @since 2018-12-26
**/
public class JavaKafkaConsumer {
private static Logger logger = LoggerFactory.getLogger(JavaKafkaConsumer.class);
private static Producer<String, String> producer;
private final static String TOPIC = "kafka-test-topic";
private static final String ZOOKEEPER_HOST = "node-2:2181,node-3:2181,node-4:2181";
private static final String KAFKA_BROKER = "node-2:9092,node-3:9092,node-4:9092";
private static Properties properties;
static {
properties = new Properties();
properties.put("bootstrap.servers", KAFKA_BROKER);
properties.put("group.id", "test");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("key.deserializer", StringDeserializer.class.getName());
properties.put("value.deserializer", StringDeserializer.class.getName());
}
public static void main(String[] args) {
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(TOPIC), new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
}
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
// 将偏移设置到最开始
consumer.seekToBeginning(collection);
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
logger.info("offset: {}, key: {}, value: {}", record.offset(), record.key(), record.value());
}
}
}
}
复制代码
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.UUID;
/**
* <p>
*
* @author leone
* @since 2018-12-26
**/
public class JavaKafkaProducer {
private static Logger logger = LoggerFactory.getLogger(JavaKafkaProducer.class);
private static Producer<String, String> producer;
private final static String TOPIC = "kafka-test-topic";
private static final String ZOOKEEPER_HOST = "node-2:2181,node-3:2181,node-4:2181";
private static final String KAFKA_BROKER = "node-2:9092,node-3:9092,node-4:9092";
private static Properties properties;
static {
properties = new Properties();
properties.put("bootstrap.servers", KAFKA_BROKER);
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", StringSerializer.class.getName());
properties.put("value.serializer", StringSerializer.class.getName());
}
public static void main(String[] args) {
Producer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 200; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String uuid = UUID.randomUUID().toString();
producer.send(new ProducerRecord<>(TOPIC, Integer.toString(i), uuid));
logger.info("send message success key: {}, value: {}", i, uuid);
}
producer.close();
}
}
复制代码
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.server.ConfigType;
import kafka.utils.ZkUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.security.JaasUtils;
import org.junit.Test;
import java.util.*;
/**
* <p>
*
* @author leone
* @since 2018-12-26
**/
public class KafkaClient {
private final static String TOPIC = "kafka-test-topic";
private static final String ZOOKEEPER_HOST = "node-2:2181,node-3:2181,node-4:2181";
private static final String KAFKA_BROKER = "node-2:9092,node-3:9092,node-4:9092";
private static Properties properties = new Properties();
static {
properties.put("bootstrap.servers", KAFKA_BROKER);
}
/**
* 建立topic
*/
@Test
public void createTopic() {
AdminClient adminClient = AdminClient.create(properties);
List<NewTopic> newTopics = Arrays.asList(new NewTopic(TOPIC, 1, (short) 1));
CreateTopicsResult result = adminClient.createTopics(newTopics);
try {
result.all().get();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 建立topic
*/
@Test
public void create() {
ZkUtils zkUtils = ZkUtils.apply(ZOOKEEPER_HOST, 30000, 30000, JaasUtils.isZkSecurityEnabled());
// 建立一个3个分区2个副本名为t1的topic
AdminUtils.createTopic(zkUtils, "t1", 3, 2, new Properties(), RackAwareMode.Enforced$.MODULE$);
zkUtils.close();
}
/**
* 查询topic
*/
@Test
public void listTopic() {
ZkUtils zkUtils = ZkUtils.apply(ZOOKEEPER_HOST, 30000, 30000, JaasUtils.isZkSecurityEnabled());
// 获取 topic 全部属性
Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), "streaming-topic");
Iterator it = props.entrySet().iterator();
while (it.hasNext()) {
Map.Entry entry = (Map.Entry) it.next();
System.err.println(entry.getKey() + " = " + entry.getValue());
}
zkUtils.close();
}
/**
* 修改topic
*/
@Test
public void updateTopic() {
ZkUtils zkUtils = ZkUtils.apply(ZOOKEEPER_HOST, 30000, 30000, JaasUtils.isZkSecurityEnabled());
Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), "log-test");
// 增长topic级别属性
props.put("min.cleanable.dirty.ratio", "0.4");
// 删除topic级别属性
props.remove("max.message.bytes");
// 修改topic 'test'的属性
AdminUtils.changeTopicConfig(zkUtils, "log-test", props);
zkUtils.close();
}
/**
* 删除topic 't1'
*/
@Test
public void deleteTopic() {
ZkUtils zkUtils = ZkUtils.apply(ZOOKEEPER_HOST, 30000, 30000, JaasUtils.isZkSecurityEnabled());
AdminUtils.deleteTopic(zkUtils, "t1");
zkUtils.close();
}
}
复制代码
log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n
复制代码
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>spring-boot-kafka</artifactId>
<groupId>com.andy</groupId>
<version>1.0.7.RELEASE</version>
<packaging>jar</packaging>
<modelVersion>4.0.0</modelVersion>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.spring.platform</groupId>
<artifactId>platform-bom</artifactId>
<version>Cairo-SR5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.0.3.RELEASE</version>
<configuration>
<!--<mainClass>${start-class}</mainClass>-->
<layout>ZIP</layout>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
复制代码
spring:
application:
name: spring-jms
kafka:
bootstrap-servers: node-2:9092,node-3:9092,node-4:9092
producer:
retries:
batch-size: 16384
buffer-memory: 33554432
compressionType: snappy
acks: all
consumer:
group-id: 0
auto-offset-reset: earliest
enable-auto-commit: true
复制代码
/**
* <p>
*
* @author leone
* @since 2018-12-26
**/
@ToString
public class Message<T> {
private Long id;
private T message;
private Date time;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public T getMessage() {
return message;
}
public void setMessage(T message) {
this.message = message;
}
public Date getTime() {
return time;
}
public void setTime(Date time) {
this.time = time;
}
}
复制代码
import com.andy.jms.kafka.service.KafkaSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* <p>
*
* @author leone
* @since 2018-12-26
**/
@Slf4j
@RestController
public class KafkaController {
@Autowired
private KafkaSender kafkaSender;
@GetMapping("/kafka/{topic}")
public String send(@PathVariable("topic") String topic, @RequestParam String message) {
kafkaSender.send(topic, message);
return "success";
}
}
复制代码
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* <p>
*
* @author leone
* @since 2018-12-26
**/
@Slf4j
@Component
public class KafkaReceiver {
@KafkaListener(topics = {"order"})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("record:{}", record);
log.info("message:{}", message);
}
}
}
复制代码
import com.andy.jms.kafka.commen.Message;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* <p>
*
* @author leone
* @since 2018-12-26
**/
@Slf4j
@Component
public class KafkaSender {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private ObjectMapper objectMapper;
/**
*
* @param topic
* @param body
*/
public void send(String topic, Object body) {
Message<String> message = new Message<>();
message.setId(System.currentTimeMillis());
message.setMessage(body.toString());
message.setTime(new Date());
String content = null;
try {
content = objectMapper.writeValueAsString(message);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
kafkaTemplate.send(topic, content);
log.info("send {} to {} success!", message, topic);
}
}
复制代码
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author Leone
* @since 2018-04-10
**/
@SpringBootApplication
public class JmsApplication {
public static void main(String[] args) {
SpringApplication.run(JmsApplication.class, args);
}
}
复制代码