目录java
除了上篇文章所讲的 ActiveMQ,还有一种流行的开源消息中间件叫 RabbitMQ。和 ActiveMQ 相比,它具备更高的性能。web
RabbitMQ 再也不基于 JMS 规范,也没有选择 Java 做为底层实现语言。 它基于另外一种消息通讯协议,名为 AMQP,并采用 Erlang 语言做为技术实现。 RabbitMQ 提供了众多语言客户端,可以与 Spring 框架整合,Spring Boot 也提供了对 RabbitMQ 的支持。spring
RabbitMQ 官网: http://www.rabbitmq.comdocker
RabbitMQ 官方已经提供了本身的 Docker 容器,先下载 rabbitmq:3-management 镜像来启动 RabbitMQ 容器, 之因此选择这个镜像是由于它拥有一个 web 控制台,能够经过浏览器来访问。shell
docker pull rabbitmq:3-management
RabbitMQ 除了控制台,还提供了 HTTP API 方式,可方便应用程序使用。浏览器
下面使用以下 Docker 命令启动 RabbitMQ服务器
docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq rabbitmq:3-management
在启动 RabbitMQ 容器时,它对宿主机暴露了两个端口号架构
此外,启动时还有两个环境变量app
RabbitMQ 容器启动完毕后,打开浏览器,并在地址栏中输入 http://localhost:15672/
,而且输入登陆的用户名和密码,就能够看到控制台以下所示
框架
在上面管理界面中,包含 6 个功能菜单
RabbitMQ 只有 Queue, 没有 Topic,由于可经过 Exchange 与 Queue 的组合来实现 Topic 所具有的功能。RabbitMQ 的消息模型以下图所示
在 Exchange 和 Queue 间有一个 Binding 关系,当消息从 Producer 发送到 Exchange 中时,会根据 Binding 来路由消息的去向。
所以可将 Binding 理解为 Exchange 到 Queue 的路由规则,这些规则可经过 RabbitMQ 所提供的客户端 API 来控制,也可经过 RabbitMQ 提供的控制台来管理。
RabbitMQ 提供了一个默认的 Exchange(AMQP default),在控制台的 Exchange 菜单中就能够看到它,简单状况下,只须要使用默认的 Exchange 便可,当须要提供发布与订阅功能时才会使用自定义的 Exchange。
下面咱们就将 Spring Boot 与 RabbitMQ 进行整合,先开发一个服务端做为消息的消费者,再开发一个客户端做为消息的生产者,随后运行客户端,并查看服务端中接收到的消息。
建立一个名为 rabbitmq-hello-server 的 maven 项目或者 Spring Starter Project, 在 pom.xml 文件中添加下面 Maven 依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.19.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
Spring Boot 框架中已经添加了对 RabbitMQ 的支持,只须要依赖 spring-boot-starter-amqp
就能够启动 RabbitMQ,此时还须要在 application.properties
配置文件中添加 RabbitMQ 的相关配置项
spring.rabbitmq.addresses=127.0.0.1:5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin
接下来建立 HelloServer 类,封装服务端代码
package demo.msa.rabbitmq; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class HelloServer { @RabbitListener(queues = "hello-queue") public void receive(String message) { System.out.println(message); } }
只须要在 receive()
方法上定义 @RabbitListener
,而且设置 queues
参数来指定消费者须要监听的的队列名称。
最后,编写一个 Spring Boot 应用程序启动类来启动服务器
package demo.msa.rabbitmq; import org.springframework.amqp.core.Queue; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @SpringBootApplication public class RabbitmqHelloServerApplication { @Bean public Queue helloQueue() { return new Queue("hello-queue"); } public static void main(String[] args) { SpringApplication.run(RabbitmqHelloServerApplication.class, args); } }
在 RabbitMQ 中,必须经过程序来显式建立队列。服务端启动完毕后,将持续监听 RabbitMQ 的 hello-queue 队列中即将到来的消息,该消息由客户端来发送。
建立一个名为 rabbitmq-hello-client 的 maven 项目或者 Spring Starter Project, pom 中的依赖与服务端一致。客户端的 application.properties 文件与服务端一致。
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.19.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
接下来建立一个名为 HelloClient 的类,将其做为客户端
package demo.msa.rabbitmq; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class HelloClient { @Autowired private RabbitTemplate rabbitmqTemplate; public void send(String message) { rabbitmqTemplate.convertAndSend("hello-queue", message); } }
最后编写 Spring Boot 应用程序启动类来启动客户端
package demo.msa.rabbitmq; import javax.annotation.PostConstruct; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @SpringBootApplication public class RabbitmqHelloClientApplication { @Autowired private HelloClient helloClient; @Bean public Queue helloQueue() { return new Queue("hello-queue"); } @PostConstruct public void init() { helloClient.send("hello world!"); } public static void main(String[] args) { SpringApplication.run(RabbitmqHelloClientApplication.class, args).close(); } }
与服务端同样,此处使用 @Bean
注解的 helloQueue()
方法建立一个名为 hello-queue
的队列,这样能够保证当客户端在服务端以前启动时,也能建立所需的队列。并且 RabbitMQ 能够确保不会建立同名的队列,所以可分别在服务端与客户端建立同名的队列。
运行 main 方法能够启动客户端应用程序,此时将在服务端看到客户端发送的消息,也能够在 RabbitMQ 控制台中看到消息队列当前的状态。
上面发送和接收的消息只是 String 类型,若是发送的消息是一个普通的 Java Bean 类型,应该如何调用呢?
Java Bean 类型则必须实现 Serializable
序列化接口才能正常调用,这是由于 RabbitMQ 所传送的消息是 byte[]
类型,当客户端发送消息须要进行序列化(也就是讲 Java 类型转换为 byte[] 类型),当服务端接收消息前须要先反序列化,所以发送和接收的消息对象必须实现 JDK 的序列化接口。
除了这种序列化方式外,咱们也可使用 Jackson 来实现,并且 RabbitMQ 已经为咱们提供了 jackson 序列化的方式,这种方式更加高效。所须要作的是定义一个 Jackson2JsonMessageConverter
的 Spring Bean。
@Bean public Jackson2JsonMessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); }
RabbitMQ 的性能很是高效和稳定,也能很是方便的与 Spring Boot 应用程序集成,还拥有很是丰富的官方文档和控制台,所以选择 RabbitMQ 做为服务之间的异步消息调用平台,将成为整个微服务架构中的 "消息中心"。