时间:2017年07月22日星期六
说明:本文部份内容均来自慕课网。@慕课网:http://www.imooc.com
教学源码:无
学习源码:https://github.com/zccodere/s...java
Java消息中间件(入门篇)git
为何须要使用消息中间件 消息中间件概述 JMS规范 JMS代码演练
Java消息中间件(拓展篇)github
ActiveMQ集群配置 消息中间件在大型系统中的最佳实践 使用其它消息中间件
经过服务调用让其它系统感知事件发生web
系统之间高耦合 程序执行效率低
经过消息中间件解耦服务调用spring
生活中的案例apache
微信公众号 老师在黑板上写字 电视机 等等
消息中间件带来的好处vim
解耦:系统解耦 异步:异步执行 横向扩展 安全可靠 顺序保证
横向扩展解释安全
当登陆系统,须要不少用户登陆。这些消息所有须要告知积分系统,去增长积分,而增长积分这个处理过程可能比较麻烦、比较耗时。这个时候,能够启动多台积分系统,来同时消费这个消息中间件里面的登陆消息,达到横向扩展的做用。
什么是中间件服务器
非底层操做系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件
什么是消息中间件微信
关注于数据的发送和接受,利用高效可靠的异步消息传递机制集成分布式系统
示意图
什么是JMS
Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通讯。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
什么是AMQP
AMQP(Advanced Message Queuing Protocol)是一个提供统一消息服务的应用层标准高级消息队列协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不一样产品,不一样的开发语言等条件的限制。
JMS和AMQP对比
ActiveMQ
ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ是一个彻底支持JMS1.1和J2EE1.4规范的JMS Provider实现,尽管JMS规范出台已是好久的事情了,可是JMS在当今J2EE应用中间件仍然扮演者特殊的地位。
ActiveMQ特性
多种语言和协议编写客户端。 语言:Java、C、C++、C#、Ruby、Perl、Python、PHP 应用协议:OpenWire、Stomp、REST、WS Notification、XMPP、AMQP 彻底支持JMS1.1和J2EE1.4规范(持久化、XA消息、事务) 虚拟主题、组合目的、镜像队列
RabbitMQ
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ特性
支持多种客户端 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript等 AMQP的完整实现(vhost、Exchange、Binding、Routing Key等) 事务支持/发布确认 消息持久化
Kafka
Kafka是一种高吞吐量的分布式发布订阅消息系统,是一个分布式、分区的、可高的分布式日志存储服务。它经过一种独一无二的设计提供了一个消息系统的功能。
Kafka特性
经过O(1)的磁盘数据结构提供消息的持久化, 这种结构对于即便数以TB的消息存储也可以保持长时间的稳定性能 高吞吐量:即便是很是普通的硬件Kafka也能够支持每秒数百万的消息 Partition、Consumer Group
综合评价
Java消息服务定义
Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通讯。
JMS相关概念
提供者:实现JMS规范的消息中间件服务器 客户端:发送或接收消息的应用程序 生产者/发布者:建立并发送消息的客户端 消费者/订阅者:接收并处理消息的客户端 消息:应用程序之间传递的数据内容 消息模式:在客户端之间传递消息的模式,JMS中定义了主题和队列两种模式
JMS消息模式:队列模式
客户端包括生产者和消费者 队列中的消息只能被一个消费者消费 消费者能够随时消费队列中的消息
队列模型示意图
JMS消息模式:主题模型
客户端包括发布者和订阅者 主题中的消息被全部订阅者消费 消费者不能消费订阅以前就发送到主题中的消息
主题模型示意图
JMS编码接口
ConnectionFactory:用于建立链接到消息中间件的链接工厂 Connection:表明了应用程序和消息服务器之间的通讯链路 Destination:指消息发布和接收的地点,包括队列和主题 Session:表示一个单线程的上下文,用于发送和接收消息 MessageConsumer:由会话建立,用户接收发送到目标的消息 MessageProducer:由会话建立,用于发送消息到目标 Message:是在消费者和生产者之间传送的对象,消息头,一组消息属性,一个消息体
JMS编码接口之间的关系
在Windows安装ActiveMQ
下载安装包 直接启动 使用服务启动
安装验证
访问地址:http://127.0.0.1:8161/ 默认用户:admin 默认密码:admin
在Linux安装ActiveMQ
下载并解压安装包 启动
启动验证
进入到bin目录,使用命令./activemq start启动服务 使用命令ps -ef |grep activemq查看进程是否存在 使用命令./activemq stop关闭服务
安装验证
访问地址:http://Linux主机IP:8161/ 默认用户:admin 默认密码:admin
使用JMS接口规范链接ActiveMQ
建立生产者 建立消费者 建立发布者 建立订阅者
回顾JMS编码接口之间的关系
代码演示
1.编写AppProducer类
package com.myimooc.jms.queue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * App 生产者-队列模式 * @author ZhangCheng on 2017-07-22 * */ public class AppProducer { /** 指定ActiveMQ服务的地址 */ private static final String URL = "tcp://127.0.0.1:61616"; /** 指定队列的名称 */ private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws JMSException { // 1.建立ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.建立Connection Connection connection = connectionFactory.createConnection(); // 3.启动链接 connection.start(); // 4.建立会话(第一个参数:是否在事务中处理) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5. 建立一个目标 Destination destination = session.createQueue(QUEUE_NAME); // 6.建立一个生产者 MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 100; i++) { // 7.建立消息 TextMessage textMessage = session.createTextMessage("test" + i); // 8.发布消息 producer.send(textMessage); System.out.println("消息发送:" + textMessage.getText()); } // 9.关闭链接 connection.close(); } }
2.编写AppConsumer类
package com.myimooc.jms.queue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * App 消费者-队列模式 * @author ZhangCheng on 2017-07-22 * */ public class AppConsumer { /** 指定ActiveMQ服务的地址 */ private static final String URL = "tcp://127.0.0.1:61616"; /** 指定队列的名称 */ private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws JMSException { // 1.建立ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.建立Connection Connection connection = connectionFactory.createConnection(); // 3.启动链接 connection.start(); // 4.建立会话(第一个参数:是否在事务中处理) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.建立一个目标 Destination destination = session.createQueue(QUEUE_NAME); // 6.建立一个消费者 MessageConsumer consumer = session.createConsumer(destination); // 7.建立一个监听器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println("接收消息:" + textMessage.getText()); } catch (JMSException e) { System.out.println("接收消息异常:"); e.printStackTrace(); } } }); // 8.关闭链接 //connection.close(); } }
代码演示
1.编写AppProducer类
package com.myimooc.jms.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * App 生产者-主题模式 * @author ZhangCheng on 2017-07-22 * */ public class AppProducer { /** 指定ActiveMQ服务的地址 */ private static final String URL = "tcp://127.0.0.1:61616"; /** 指定主题的名称 */ private static final String TOPIC_NAME = "topic-test"; public static void main(String[] args) throws JMSException { // 1.建立ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.建立Connection Connection connection = connectionFactory.createConnection(); // 3.启动链接 connection.start(); // 4.建立会话(第一个参数:是否在事务中处理) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5. 建立一个目标 Destination destination = session.createTopic(TOPIC_NAME); // 6.建立一个生产者 MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 100; i++) { // 7.建立消息 TextMessage textMessage = session.createTextMessage("test" + i); // 8.发布消息 producer.send(textMessage); System.out.println("消息发送:" + textMessage.getText()); } // 9.关闭链接 connection.close(); } }
2.编写AppConsumer类
package com.myimooc.jms.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * App 消费者-主题模式 * @author ZhangCheng on 2017-07-22 * */ public class AppConsumer { /** 指定ActiveMQ服务的地址 */ private static final String URL = "tcp://127.0.0.1:61616"; /** 指定主题的名称 */ private static final String TOPIC_NAME = "topic-test"; public static void main(String[] args) throws JMSException { // 1.建立ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.建立Connection Connection connection = connectionFactory.createConnection(); // 3.启动链接 connection.start(); // 4.建立会话(第一个参数:是否在事务中处理) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.建立一个目标 Destination destination = session.createTopic(TOPIC_NAME); // 6.建立一个消费者 MessageConsumer consumer = session.createConsumer(destination); // 7.建立一个监听器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println("接收消息:" + textMessage.getText()); } catch (JMSException e) { System.out.println("接收消息异常:"); e.printStackTrace(); } } }); // 8.关闭链接 //connection.close(); } }
使用Spring集成JMS链接ActiveMQ
ConnectionFactory:用于管理链接的链接工厂 JmsTemplate:用于发送和接收消息的模版类 MessageListener:消息监听器
ConnectionFactory
一个Spring为咱们提供的链接池 JmsTemplate每次发消息都会从新建立链接,会话和productor Spring中提供了SingleConnectFactory和CachingConnectionFactory
JmsTemplate
是Spring提供的,只需向Spring容器内注册这个类就可使用JmsTemplate方便的操做jms JmsTemplate类是线程安全的,能够在整个应用范围使用
MessageListener
实现一个onMessage方法,该方法只接收一个Message参数
代码演示
1.建立名为jmsspring的maven项目POM文件以下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.myimooc</groupId> <artifactId>jmsspring</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>jmsspring</name> <url>http://maven.apache.org</url> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.1.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
2.完成后的目录结构以下
源码请到个人github地址查看
3.测试
使用Postman向ProducerController发起请求,将消息发送出去
对应的ConsumerTopicMessageListener 和 ConsumerMessageListener接收到消息
为何要对消息中间件集群
实现高可用,以排除单点故障引发的服务中断 实现负载均衡,以提高效率为更多客户提供服务
集群方式
客户端集群:让多个消费者消费同一个队列 Broker cluster:多个Broker之间同步消息 Master Slave:实现高可用
ActiveMQ失效转移(failover)-客户端配置
容许当其中一台消息服务器宕机时,客户端在传输层上从新链接到其它消息服务器 语法:failover:(uri1,…,uriN)?transportOptions
transportOptions参数说明
randomize默认为true,表示在URI列表中选择URI链接时是否采用随机策略 initialReconnectDelay默认为10,单位毫秒,表示第一次尝试重连之间等待的时间 maxReconnectDelay默认为30000,单位毫秒,最长重连的时间间隔
Broker cluster集群配置-原理
NetworkConnector(网络链接器)
网络链接器主要用于配置ActiveMQ服务器与服务器之间的网络通信方式,用于服务器透传消息 网络链接器分为静态链接器和动态链接器
静态链接器
动态链接器
ActiveMQ Master Slace集群方案
Share nothing storage master/slave(已过期,5.8+后移除) Shared storage master/slave 共享存储 Replicated LevelDB Store基于负责的LevelDB Store
共享存储集群的原理
基于复制的LevelDB Store的原理
两种集群方式对比
三台服务器的完美集群方案
ActiveMQ集群配置方案
配置过程
1.节点准备
mkdir activemq建立目录 cp -rf apache-activemq-5.15.0 activemq/activemq-a cp -rf apache-activemq-5.15.0 activemq/activemq-b cp -rf apache-activemq-5.15.0 activemq/activemq-c cd activemq mkdir kahadb
2.配置a节点
cd activemq-a/ cd conf/ vim activemq.xml <networkConnectors> <networkConnector name="local_network" uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)" /> </networkConnectors> vim jetty.xml:配置管理端口号,a节点使用默认端口,无须配置
3.配置b节点
vim activemq.xml 配置网络链接器 <networkConnectors> <networkConnector name="network_a" uri="static:(tcp://127.0.0.1:61616)" /> </networkConnectors> 配置持久化存储路径 <persistenceAdapter> <kahaDB directory="/studio/activemq/kahadb"/> </persistenceAdapter> 配置服务端口 <transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> vim jetty.xml 配置管理端口号 <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="host" value="0.0.0.0"/> <property name="port" value="8162"/> </bean>
4.配置c节点
vim activemq.xml 配置网络链接器 <networkConnectors> <networkConnector name="network_a" uri="static:(tcp://127.0.0.1:61616)" /> </networkConnectors> 配置持久化存储路径 <persistenceAdapter> <kahaDB directory="/studio/activemq/kahadb"/> </persistenceAdapter> 配置服务端口 <transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> vim jetty.xml 配置管理端口号 <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="host" value="0.0.0.0"/> <property name="port" value="8163"/> </bean>
5.启动服务
回到activemq目录,分别启动a,b,c三个节点
./activemq-a/bin/activemq start ./activemq-b/bin/activemq start ./activemq-c /bin/activemq start
检查是否都启动成功
ps -ef |grep activemq
检查是否对外提供服务,即端口是否被监听(占用)
netstat -anp |grep 61616 netstat -anp |grep 61617 netstat -anp |grep 61618
检查发现61618即c节点没有提供服务,可是c节点的进程是启动成功了的。由于b节点和c点击是master/slave配置,如今b节点获取到了共享文件夹的全部权,因此c节点正在等待得到资源,而且提供服务。即c节点在未得到资源以前,是不提供服务的。
测试,把b节点杀掉,看c节点能不能提供61618的服务
./activemq-b/bin/activemq stop netstat -anp |grep 61618 ./activemq-b/bin/activemq start netstat -anp |grep 61617
检查发现,从新启动b节点后,b节点61617端口并无提供服务,是由于如今b节点成为了slave节点,而c节点成为了master节点。因此,如今b节点启动了,可是它并不对外提供服务。只有当c节点出现问题后,b节点才对外提供服务。
6.经过代码测试集群配置是否生效
生产者
package com.myimooc.jms.queue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * App 生产者-队列模式-集群配置测试 * @author ZhangCheng on 2017-07-25 * */ public class AppProducerTest { /** failover 为状态转移的存在部分 * 因a节点只做为消费者使用,因此这里不配置61616节点了。 * */ private static final String URL = "failover:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true"; /** 指定队列的名称 */ private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws JMSException { // 1.建立ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.建立Connection Connection connection = connectionFactory.createConnection(); // 3.启动链接 connection.start(); // 4.建立会话(第一个参数:是否在事务中处理) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5. 建立一个目标 Destination destination = session.createQueue(QUEUE_NAME); // 6.建立一个生产者 MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 100; i++) { // 7.建立消息 TextMessage textMessage = session.createTextMessage("test" + i); // 8.发布消息 producer.send(textMessage); System.out.println("消息发送:" + textMessage.getText()); } // 9.关闭链接 connection.close(); } }
消费者
package com.myimooc.jms.queue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * App 消费者-队列模式-集群配置测试 * @author ZhangCheng on 2017-07-22 * */ public class AppConsumerTest { /** failover 为状态转移的存在部分 * */ private static final String URL = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true"; /** 指定队列的名称 */ private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws JMSException { // 1.建立ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.建立Connection Connection connection = connectionFactory.createConnection(); // 3.启动链接 connection.start(); // 4.建立会话(第一个参数:是否在事务中处理) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.建立一个目标 Destination destination = session.createQueue(QUEUE_NAME); // 6.建立一个消费者 MessageConsumer consumer = session.createConsumer(destination); // 7.建立一个监听器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println("接收消息:" + textMessage.getText()); } catch (JMSException e) { System.out.println("接收消息异常:"); e.printStackTrace(); } } }); // 8.关闭链接 //connection.close(); } }
运行生产者,而后到管理界面查看消息发送到了那里
http://127.0.0.1:8161 http://127.0.0.1:8162 http://127.0.0.1:8163
查看发现,8162没法访问,是由于b节点是slave节点,不提供服务,消息都发送到了c节点
把8163即c节点宕掉后,运行消费者,查看消息是否可以使用
./activemq-c/bin/activemq stop
实际业务场景分析
实际业务场景特色
子业务系统都有集群的可能性 同一个消息会广播给关注该类消息的全部子业务系统 同一类消息在集群中被负载消费 业务的发生和消息的发布最终一致性
须要解决的问题
不一样业务系统分别处理同一个消息,同一业务系统负载处理同类消息 解决消息发送时的一致性问题 解决消息处理的幂等性问题 基于消息机制创建事件总线
集群系统处理消息方案-使用JMS级联的解决方案
集群系统处理消息方案-使用ActiveMQ的虚拟主题解决方案
发布者:将消息发布到一个主题中,主题名以VirtualTopic开头,如VirtualTopic.TEST 消费者:从队列中获取消息,在队列名中表名本身身份,如Consumer.A.VirtualTopic.TEST
解决消息发送时的一致性问题-使用JMS中XA系列接口保证强一致性
引入分布式事务 要求业务操做必须支持XA协议
解决消息发送时的一致性问题-使用消息表的本地事务解决方案
解决消息发送时的一致性问题-使用内存日志的解决方案
解决消息处理的幂等性问题
所谓幂等性问题,是指屡次执行所产生的影响(结果)与一次执行所产生的影响(结果)同样。好比:支付成功后,支付宝会发起屡次通知给业务系统,要求业务系统可以处理这些重复的消息,可是又不重复处理订单。若是在消息处理系统中保证幂等性,会增长系统复杂度,咱们能够统一处理幂等性后,再将消息发送给消息处理系统。
解决消息处理的幂等性问题-使用消息表的本地事务解决方案
解决消息处理的幂等性问题-使用内存日志的解决方案
基于消息机制的事件总线-什么是事件驱动架构
事件驱动架构(Event Driven Architecture,EDA)定义了一个设计和实现一个应用系统的方法学,在这个系统里事件可传输于松散耦合的组件和服务之间。特色:有事我叫你,没事别烦我
事件驱动架构模型
该教师正在开发该事件总线的框架,github地址https://github.com/jovezhao/nest。
分析须要作的事
解决各业务系统集群处理同一条消息 实现本身的消息提供者
经常使用消息中间件
ActiveMQ RabbitMQ Kafka
集成RabbitMQ
RabbitMQ:使用交换器绑定到队列
示意图
RabbitMQ消息提供者源码解析
建立ConnectionFactory 建立Connection 建立Channel 定义Exchange 定义Queue而且绑定队列
集成Kafka
Kafka使用group.id分组消费者
配置消息者参数group.id相对时对消息进行负载处理 配置服务器partitions参数,控制同一个group.id下的consumer数量小于partitions Kafka只保证同一个partition下的消息是有序的