SpringCloud的消息服务:SpringCloudStream

源代码:https://gitee.com/jiaodacailei/SpringCloudStream.git

1.基础知识

消息服务器作用:
一对多通知
并行转串行,削峰,削流

JMS - Java Message Service
Java定义的消息模型或者规范,有一套标准的Java API访问消息服务器,类似于jdbc访问数据库
消息服务器产品:
ActiveMQ/RabbitMQ/RocketMQ/Kafka
ActiveMQ/RabbitMQ/RocketMQ实现了JMS
Kafka没有实现JMS

SpringCloud Stream:SpringCloud的消息服务接口,可以使用它访问:
RabbitMQ/Kafka

Kafka需要Zookeeper作为数据存储。

2.安装Kafka

官网下载:http://kafka.apache.org/downloads
安装和使用:http://kafka.apache.org/quickstart
课程使用:kafka_2.12-2.2.0.zip
链接:https://pan.baidu.com/s/1Yy0BigcXRhFblVIrdtvRmw
提取码:q4o3

2.1.解压

1.png

2.2.启动Kafka

点击start.bat
会启动2个cmd窗口:
2.png
使用,请参考zip包中的windows安装指南.txt

2.3.创建主题

启动一个cmd窗口,切换目录到kafka解压目录:
3.png
执行命令:
bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic myTopic
4.png
执行如下命令查询新建的主题:
bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

5.png

2.4.发送消息

输入命令:bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic myTopic
输入消息内容:abc
6.png

2.5.接收消息

另外启动一个新的cmd窗口,切换目录到kafka解压目录:
7.png
执行命令:
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic myTopic --from-beginning
8.png
启动之后,可以看到上面生产者发送的abc消息。
生产者继续发送abcd消息,消费者可以立即收到。

3.SpringCloud-Stream消息模型

Source:发射器
Channel:频道
Binder:绑定器
Sink:接收器
9.png

消费者分组:
假设下图中服务A的3个实例为一个分组,服务B的2个实例为一个分组,则消息过来之后,只会有一个服务A的实例和服务B的实例处理该消息,其他实例不会收到消息。
10.png

4.使用默认频道进行消息发送和接收

4.1.创建项目

4.1.1.父项目springcloud-stream

\springcloud-stream\pom.xml
11.png

4.1.2.创建子项目

\springcloud-stream\springcloud-stream-producer
\springcloud-stream\springcloud-stream-consumer
配置:
/springcloud-stream-producer/pom.xml
12.png
/springcloud-stream-consumer/pom.xml
13.png

4.2.发送消息

4.2.1.创建消息发送者类

/springcloud-stream-producer/src/main/java/com/qfedu/demo/springcloud/stream/StreamProducer.java
14.png

4.2.2.配置频道及kafka

/springcloud-stream-producer/src/main/resources/application.yml
15.png

4.3.接收消息

4.3.1.创建消息接收者类

/springcloud-stream-consumer/src/main/java/com/qfedu/demo/springcloud/stream/StreamConsumer.java
16.png

4.3.2.配置频道及kafka

4.4.测试

参照第2节,启动kafka服务器
启动生产者
启动消费者
访问如下路径,发送消息qfedu2:
17.png
消息接收:
18.png

5.自定义频道发送和接收消息

5.1.消息生产者

5.1.1.自定义消息输出频道

/springcloud-stream-producer/src/main/java/com/qfedu/demo/springcloud/stream/QfOutputChannel.java
19.png

5.1.2.配置spring

映射输出频道到kafka主题
/springcloud-stream-producer/src/main/resources/application.yml
20.png

5.1.3.启动类

绑定发射器,并增加发送消息代码
/springcloud-stream-producer/src/main/java/com/qfedu/demo/springcloud/stream/StreamProducer.java
21.png

5.2.消息消费者

5.2.1.自定义消息输入频道

/springcloud-stream-consumer/src/main/java/com/qfedu/demo/springcloud/stream/QfInputChannel.java
22.png

5.2.2.配置spring

/springcloud-stream-consumer/src/main/resources/application.yml
23.png

5.2.3.启动类

/springcloud-stream-consumer/src/main/java/com/qfedu/demo/springcloud/stream/StreamConsumer.java
24.png

5.3.测试

25.png

26.png