转载请注明原文地址:http://www.javashuo.com/article/p-qkuxhakj-ne.htmlhtml
AMQP(Advanced Message Queuing Protocol)高级消息队列协议,是基于JMS进行进一步扩展和优化的异步消息处理协议。spring
其在JMS的基础上,提供了更多的方法。安全
AMQP引入了消息交换机Exchange的概念,实现了消息生产者与消息队列之间的解耦。消息再也不直接发送到队列或者主题,而是统一发送给Exchange,由交换机根据路由规则,将消息分发到不一样队列中。服务器
AMQP还引入了Channel概念,将一个connection细分为不一样channel,适用于多线程场景下,消息消费者与AMQP服务器只需创建一个TCP链接便可,各个线程对应不一样channel,经过channel实现消息的提取。网络
发布者(Publisher)发布消息(Message),传送到broker;多线程
在broker中,消息被交换机(Exchange)根据路由规则,经过binding传送到不一样的队列;app
最后 AMQP 代理会将消息投递给订阅了此队列的消费者(push API),或者消费者按照需求自行获取(pull API)。异步
Direct:当消息的routing key 与 binding 的 routing key 直接匹配,消息路由到该队列函数
Topic: 当消息routing key 与 binding 的 routing key 符合通配符匹配,消息路由到该队列(请百度通配符匹配)spring-boot
Headers: 当消息参数表中的头信息和值都与 binding参数表中匹配的话,消息路由到该队列
Fanout: 任何消息直接匹配到全部队列上
1)配置pom包:spring-boot-starter-amqp
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2)配置RabbitMQ服务器信息
首先,须要一台主机安装并启动RabbitMQ。
而后在项目中配置:
spring.application.name=应用名 spring.rabbitmq.host=RabbitMQ服务器的host spring.rabbitmq.port=5672 spring.rabbitmq.username=RabbitMQ服务器的登陆账号 spring.rabbitmq.password=密码
3)配置消息队列
@Configuration public class RabbitConfig { @Bean public Queue Queue() { return new Queue("hello"); } }
4)使用模版,实现消息生产者
@component public class HelloSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hello " + new Date(); System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("hello", context);//自动转换为消息对象并要求发送到hello队列 } }
5)实现消息接收者——建立监听器,监听hello队列,一旦有消息则调用process函数进行处理
@Component @RabbitListener(queues = "hello") public class HelloReceiver { @RabbitHandler public void process(String hello) { System.out.println("Receiver : " + hello); } }
二、Python使用RabbitMQ
1)下载RabbitMQ并解压
下载网址: http://www.rabbitmq.com/install-generic-unix.html
解压后,进入 sbin 目录, 运行 server。
默认端口为5672。
2)pip安装AMQP协议实现模块——pika
3)消息生产者:
# -*- coding: utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='first', type='fanout') channel.queue_declare(queue='hello') channel.queue_bind(exchange='first', queue='hello') channel.basic_publish(exchange='first', routing_key='', body='Hello World!')
4)消息消费者
# -*- coding: utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body):#消息处理函数 print body channel.basic_consume(callback, queue='hello', no_ack=True) channel.start_consuming()
5)查看rabbitMQ的服务状态
使用 rabbitmqctl 这个工具。例如:查看当前的队列状况
./rabbitmqctl list_queues