什么是消息?java
消息是一个或者多个实体之间沟通的一种方式而且无处不在。linux
自从计算机发明以来,计算机以多种多样的方式发送消息,消息定义了软硬件或者应用程序之间的沟通方式。消息老是有一个发送者和多个接受者,消息有synchronous和asynchronous、pub-sub和peer-to-peer, RPC和enterprise-based, Message Broker, ESB (Enterprise Service Bus), MOM
(Message Oriented Middleware)等。git
从这些咱们能够确定,消息使分布式沟通变得松耦合,意味着发送者如何发送和发送了什么消息并不重要,接受者在消费消息时并不会告知发送者。github
Spring Boot RabbitMQweb
由于像Sun/Oracle/IBM的JMS,Microsoft的MSMQ,他们所使用的协议都是有全部权的,咱们也知道JMS仅仅只是提供了一些接口API,可是若是说要尝试着去混合技术和编程语言去使用JMS,那这个就会至关的复杂。技术的不断革新,给咱们带来的一个结果就是有复杂的问题出现,就会有层出不穷的简单的解决办法被提出。在计算机消息领域,特别须要感谢JPMorgan团队,由于他们发明了AMQP (Advance Message Queuing Protocol )协议。AMQP是为MOM(Message Oriented Middleware)设计的一个开放式标准应用层协议。换句话说,你使用这个协议你就可使用任何技术或者语言。算法
在实现了AMQP协议的诸多消息系统当中,RabbitMQ是其中应用最普遍的一个之一,是MQ产品的典型表明,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。spring
本文主要介绍在Spring Boot中如何整合RabbitMQ并使用RabbitMQ。shell
安装RabbitMQapache
本实例是在CentOS7上安装RabbitMQ,不一样的系统请经过搜索引擎查找不一样的安装方式。编程
一、安装erlang 语言环境
wget http://www.erlang.org/download/otp_src_20.1.tar.gz //下载erlang包 tar -xzvf otp_src_20.1.tar.gz //解压 cd otp_src_20.1/ //切换到安装路径 ./configure --prefix=/usr/local/erlang //生产安装配置 make && make install //编译安装
vi /etc/profile
在底部添加以下内容
#set erlang environment ERL_HOME=/usr/local/erlang PATH=$ERL_HOME/bin:$PATH export ERL_HOME PATH source /etc/profile //生效
测试一下是否安装成功,在控制台输入命令erl
erl
若是进入erlang的shell则证实安装成功,退出便可。
注意:执行./configure –prefix=/usr/local/erlang命令时可能会报以下错误:
configure: error: No curses library functions found configure: error: /bin/sh '/home/jiayi/otp_src_20.1/erts/configure' failed for erts
可尝试执行以下命令解决:
yum -y install ncurses-devel
二、安装RabbitMQ
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.0/rabbitmq-server-generic-unix-3.7.0.tar.xz //下载RabbitMQ安装包 xz -d rabbitmq-server-generic-unix-3.7.0.tar.xz tar -xvf rabbitmq-server-generic-unix-3.7.0.tar
解压后多了个文件夹rabbitmq-server-3.7.0 ,重命名为rabbitmq以便记忆。
mv rabbitmq_server-3.7.0/ rabbitmq
vi /etc/profile
在底部添加以下内容
#set rabbitmq environment export PATH=$PATH:/data/rabbitmq/rabbitmq/sbin source /etc/profile //生效
rabbitmq-server -detached //启动rabbitmq,-detached表明后台守护进程方式启动。
查看状态,若是显示以下截图说明安装成功:
rabbitmqctl status
其余相关命令
启动服务:rabbitmq-server -detached 查看状态:rabbitmqctl status 关闭服务:rabbitmqctl stop 列出角色:rabbitmqctl list_users
配置网页插件
首先建立目录,不然可能报错
mkdir /etc/rabbitmq
而后启用插件
rabbitmq-plugins enable rabbitmq_management
配置防火墙
配置linux 端口 15672 网页管理 5672 AMQP端口:
firewall-cmd --permanent --add-port=15672/tcp firewall-cmd --permanent --add-port=5672/tcp systemctl restart firewalld.service
如今你在浏览器中输入服务器IP:15672 就能够看到RabbitMQ的WEB管理页面了,是否是很兴奋,但是你没有帐号密码,别急。
配置访问帐号密码和权限
默认网页是不容许访问的,须要增长一个用户修改一下权限,代码以下
rabbitmqctl add_user superrd superrd //添加用户,后面两个参数分别是用户名和密码,我这都用superrd了。 rabbitmqctl set_permissions -p / superrd ".*" ".*" ".*" //添加权限 rabbitmqctl set_user_tags superrd administrator //修改用户角色
而后就能够远程访问了,而后可直接配置用户权限等信息。
RabbitMQ/AMQP: Exchanges, Bindings和Queues
AMQP定义了三个很是容易理解的概念,分别是Exchanges、Bindings、Queues。
Exchanges:投递消息到queue都是经由exchanges完成的,每一个exchange携带一个消息而且路由到Queues。
Bindings:消息的路由涉及到一个和exchanges类型及一些规则相关的算法,也就是说消息的路由须要遵照必定的规则,这些规则咱们就称之为Bindings。
Queues:Queues是Massage的落脚点和等待接收的地方。
AMQP定义了四个exchange类型,分别为Direct, Fanout, Topic和Headers。下图展现了不一样exchange类型的特性。
虽然AMQP定义了不一样的Exchange类型,可是其主要思想都是发送一个包含routing key的消息到exchange,而后exchange基于它本省的类型会分发消息到queue。若是没有匹配到queue消息就会被扔到黑洞
default exchange将会自动与每一个已经建立的queue绑定。
direct exchange将会经过routing key与指定的queue绑定,你能够看到这种exchange类型它是做为一对一的绑定。
topic exchange和direct exchange比较相似,惟一的不一样点就是在他的binding中你能够在routing key中添加通配符。
headers exchange和topic exchange相似,惟一不一样的地方就是binding是基于消息头(message headers),这是一个很是强大的exchange,在消息头里面你能够作全部事情且能够添加任何表达式。
fanout exchange会拷贝消息发送给全部绑定的queues;这种exchange能够做为消息广播。
本文经过使用默认exchange类型来描述如何整合Spring Boot与RabbitMQ。
一、构建消息消费者
配置pom.xml以下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.springboot</groupId> <artifactId>rabbitereceiver</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>rabbitereceiver</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
在pom.xml中包含了spring-boot-starter-amqp starter pom,这个pom包含了全部连接到RabbitMQ中间件所须要的spring-amqp和rabbitmq-client库。
新建一个Spring Boot项目,添加Consumer类并写入以下代码。
package com.springboot.rabbitereceiver.rabbitmq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Consumer { private static final Logger log = LoggerFactory.getLogger(Consumer.class); @RabbitListener(queues="${myqueue}") public void handler(String message){ log.info("Consumer> " + message); } }
@Component:这个注解把普通pojo实例化到spring容器中;
@RabbitListener:此注解标记的方法(该注解也能够注解类)是为全部接收到的消息所建立的处理器,意思就是说他能够连接到RabbitMQ的queue上并建立一个监听并将消息传递到该方法。这种监听器能够经过使用正确的消息转换器尽它最大的努力将消息转换成合适的类型。
在application.properties写入以下内容。
myqueue=spring-boot spring.rabbitmq.host=192.168.199.227 spring.rabbitmq.username=superrd spring.rabbitmq.password=superrd spring.rabbitmq.port=5672 spring.rabbitmq.dynamic=true
一、构建消息生产者
咱们在Spring Boot使用JDBC Template访问数据一文中所建的项目的基础上,将该项目改形成消息的生产者,咱们的需求是在每次刷新页面时生产一条消息。
首先在pom.xml中添加amqp的依赖。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
新建rabbitmq包并增长Producer类,添加以下代码。
package com.springboot.rabbitmq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class Producer { private static final Logger log = LoggerFactory.getLogger(Producer.class); @Autowired RabbitTemplate rabbitTemplate; public void sendTo(String routingkey,String message){ log.info("Sending> ..."); this.rabbitTemplate.convertAndSend(routingkey,message); } }
@Autowired RabbitTemplate:RabbitTemplate是一个同步访问RabbitMQ发送和接受的一个简化的帮助类。
sendTo(routingKey,message):该方法有两个参数routing Key和message,在本实例中,routing Key就是queue的名字。该方法使用rabbitTemplate实例调用convertAndSend方法接受routing key和message。message将会被路由到exchange,而后exchange会将消息路由到正确的queue。
在application.properties中写入的内容同消费者一直,这里再也不给出。
在JournalService中增长以下代码:
@Value("${myqueue}") String queue; @Bean Queue queue(){ return new Queue(queue,false); } @Autowired Producer producer; public void sendMsg(String msg){ producer.sendTo(queue, msg+" at " + new Date()); }
@Value:从application.properties取值;
@Bean Queue:实例化一个Queue类型的bean并使用提供的queue字符串建立一个Queue。Queue的构造函数接受queue的名称和是否在服务重启时持久化queue。
在页面加载时,咱们就经过service调用sendMsg方法来生产一条消息。
@Controller public class JournalController { @Autowired JournalService service; @RequestMapping("/") public String index(Model model){ model.addAttribute("journal", service.findAll()); service.sendMsg("Load index page"); return "index"; } }
在启动项目以前,经过RabbitMQ网页管理工具添加一个queue。
启动两个项目,刷星web页面,在消息消费者的控制台您能够看到以下内容。
2017-12-18 15:54:09.947 INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer : Consumer> Load index page at Mon Dec 18 15:54:09 CST 2017 2017-12-18 15:54:10.648 INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer : Consumer> Load index page at Mon Dec 18 15:54:10 CST 2017 2017-12-18 15:54:10.827 INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer : Consumer> Load index page at Mon Dec 18 15:54:10 CST 2017 2017-12-18 15:54:11.007 INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer : Consumer> Load index page at Mon Dec 18 15:54:11 CST 2017 2017-12-18 15:54:11.183 INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer : Consumer> Load index page at Mon Dec 18 15:54:11 CST 2017 2017-12-18 15:54:11.336 INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer : Consumer> Load index page at Mon Dec 18 15:54:11 CST 2017 2017-12-18 15:54:19.452 INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer : Consumer> Load index page at Mon Dec 18 15:54:19 CST 2017 2017-12-18 15:54:19.614 INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer : Consumer> Load index page at Mon Dec 18 15:54:19 CST 2017 2017-12-18 15:54:19.797 INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer : Consumer> Load index page at Mon Dec 18 15:54:19 CST 2017 2017-12-18 15:54:19.987 INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer : Con