近期在作 SOFA 与 SpringCloud 的集成,但愿经过一系列的 DEMO 工程去帮助你们更好的使用 SOFA 和 SpringCloud;同时也但愿你们一块儿来参与共建和 star。java
GitHub传送门:spring-cloud-sofastack-sampleslinux
官方网站:https://kafka.apache.org/git
Apache Kafka™ 是 一个分布式数据流平台,从官方文档的解释来看,其职能大致以下:github
做为一个后端司机,大多数状况下都是把 Kafka 做为一个分布式消息队列来使用的,分布式消息队列能够提供应用解耦、流量消峰、消息分发等功能,已是大型互联网服务架构不可缺乏的基础设置了。spring
Kafka 对数据提供的核心抽象,topic 是发布的数据流的类别或名称。topic 在 Kafka 中,支持多订阅者; 也就是说,topic 能够有零个、一个或多个消费者订阅写到相应 topic 的数据。对应每个 topic,Kafka 集群会维护像一个以下这样的分区的日志: apache
日志中的 Partition 有几个目的:bootstrap
原贴:kafka中的topic为何要进行分区 ,因为不能转载,此处不摘抄原文~vim
生产者将数据发布到他们选择的 topic , 生产者负责选择要吧数据分配给 topic 中哪一个 Partition。这能够经过循环方式(round-robin)简单地平衡负载,或者能够根据某些语义进行分区(例如基于数据中的某些关键字)来完成。后端
消费者们使用消费群组(consumer group )名称来标注本身,几个消费者共享一个 group,每个发布到 topic 的数据会被传递到每一个消费群组(consumer group )中的一个消费者实例。 消费者实例能够在不一样的进程中或不一样的机器上。bash
若是全部的消费者实例具备相同的 consumer group,则记录将在全部的消费者实例上有效地负载平衡
若是全部的消费者实例都有不一样的 consumer group,那么每一个记录将被广播给全部的消费者进程,每一个数据都发到了全部的消费者。
上图解释源自《Kafka 官方文档》 介绍:
如上图,一个两个服务器节点的Kafka集群, 托管着4个分区(P0-P3),分为两个消费者群. 消费者群A有2个消费者实例,消费者群B有4个. 然而,更常见的是,咱们发现主题具备少许的消费者群,每一个消费者群表明一个“逻辑订户”。每一个组由许多消费者实例组成,保证可扩展性和容错能力。这能够说是“发布-订阅”语义,但用户是一组消费者而不是单个进程。 在Kafka中实现消费的方式,是经过将日志中的分区均分到消费者实例上,以便每一个实例在任什么时候间都是“相应大小的一块”分区的惟一消费者。维护消费者组成员资格的过程,由卡夫卡协议动态处理。 若是新的实例加入组,他们将从组中的其余成员接管一些分区; 若是一个实例消失,其分区将被分发到剩余的实例。 Kafka仅提供单个分区内的记录的顺序,而不是主题中的不一样分区之间的总顺序。 每一个分区排序结合按键分区,足以知足大多数应用程序的需求。 可是,若是您须要使用总顺序,则能够经过仅具备一个分区的主题来实现,尽管这仅意味着每一个消费者组只有一个消费者进程。
本篇只介绍 Kafka 做为消息队列的一些基本概念,更多介绍请参考官方文档。
这里来看下如何安装 kafka,下载地址:https://kafka.apache.org/downloads。本篇使用的版本是 kafka_2.12-1.1.1。
获取包文件
> wget http://mirrors.shu.edu.cn/apache/kafka/1.1.1/kafka_2.12-1.1.1.tgz
复制代码
解压压缩包
> tar -zxvf kafka_2.12-1.1.1.tgz
复制代码
修改配置文件
> cd kafka_2.12-1.1.1/config
> vim server.properties
复制代码
我这里主要修改项包括如下几个:
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
listeners=PLAINTEXT://192.168.0.1:9092
advertised.listeners=PLAINTEXT://192.168.0.1:9092
# zookeeper 地址,能够多个
zookeeper.connect=192.168.0.6:2181
复制代码
Kafka 服务启动须要依赖 Zookeeper ,因此在配置文件中须要指定 Zookeeper 集群地址。Kafka 本身的安装包中解压以后是包括 Zookeeper 的,能够经过如下的方式来启动一个单节点 Zookeeper 实例:
> sh zookeeper-server-start.sh -daemon config/zookeeper.properties
复制代码
这里我是指定了以前部署的一台ZK机器,因此能够直接将ZK地址指到已部署好的地址。Zookeeper 安装能够参考: Linux 下安装 Zookeeper
经过上述操做,下面就能够直接来启动Kafka 服务了:
> sh kafka-server-start.sh config/server.properties
复制代码
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.3.5.RELEASE</version><!--$NO-MVN-MAN-VER$-->
</dependency>
复制代码
为了能够把 Kafka 封装已提供给其余模块使用,你们能够将 Kafka 的生产端工具类使用 SpringBoot 的自动配置机制进行包装,以下:
@Configuration
public class KafkaProducerAutoConfiguration {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Bean
public KafkaSender kafkaSender(){
return new KafkaSender(kafkaTemplate);
}
}
复制代码
public class KafkaSender {
private KafkaTemplate<String, String> kafkaTemplate;
public KafkaSender(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
/** * send message */
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
复制代码
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
io.sofastack.cloud.core.kafka.configuration.KafkaProducerAutoConfiguration
复制代码
工程模块以下: image-20190306151759441.png
在测试工程中引入依赖,这个依赖就是上面工程打包来的:
<dependency>
<groupId>io.sofastack.cloud</groupId>
<artifactId>sofastack-cloud-core-kafka</artifactId>
</dependency>
复制代码
#============== kafka ===================
# 指定kafka 代理地址,能够多个,这里的192.168.0.1是上面Kafka 启动配置文件中对应的
# 注:网上一些帖子中说 Kafka 这里的配置只能是主机名,不支持 ip,没有验证过,
# 若是您在验证时出现问题,能够尝试本机绑定下 host
spring.kafka.bootstrap-servers= 192.168.0.1:9092
#=============== provider =======================
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100ms
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.application.name=kafka-test
logging.path=./logs
复制代码
@SpringBootApplication
@PropertySource("classpath:application-kafka.properties")
public class ProviderApplication {
public static void main(String[] args) {
ConfigurableApplicationContext run = SpringApplication.run(ProviderApplication.class, args);
// 这里经过容器获取,正常使用状况下,能够直接使用 Autowired 注入
KafkaSender bean = run.getBean(KafkaSender.class);
for (int i = 0; i < 3; i++) {
//调用消息发送类中的消息发送方法
bean.sendMessage(KafkaContants.TRADE_TOPIC, "send a test message");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
复制代码
@Component
public class KafkaReceiver {
// 配置监听的主体,groupId 和配置文件中的保持一致
@KafkaListener(topics = { KafkaContants.TRADE_TOPIC }, groupId = "test-consumer-group")
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println(message);
}
}
}
复制代码
启动工程后,能够在控制台看下消费者打印的信息:
> sh kafka-console-producer.sh --broker-list 192.168.0.1:9092 --topic trading
复制代码
执行上述命令以后,命令行将会等待输入,这里输入前后输入 glmapper 和 sofa :