AMQP与RabbitMQ

 

 

转载请注明原文地址:http://www.javashuo.com/article/p-qkuxhakj-ne.htmlhtml

 

一:AMQP是什么

  AMQP(Advanced Message Queuing Protocol)高级消息队列协议,是基于JMS进行进一步扩展和优化的异步消息处理协议。spring

  其在JMS的基础上,提供了更多的方法。安全

  AMQP引入了消息交换机Exchange的概念,实现了消息生产者与消息队列之间的解耦。消息再也不直接发送到队列或者主题,而是统一发送给Exchange,由交换机根据路由规则,将消息分发到不一样队列中。服务器

  AMQP还引入了Channel概念,将一个connection细分为不一样channel,适用于多线程场景下,消息消费者与AMQP服务器只需创建一个TCP链接便可,各个线程对应不一样channel,经过channel实现消息的提取。网络

  

二:AMQP模型

  • Broker: 接收和分发消息的应用,RabbitMQ Server就是一个消息处理实体。
  • Virtual host: 出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,相似于网络中的namespace概念。当多个不一样的用户使用同一个RabbitMQ server提供的服务时,能够划分出多个vhost,每一个用户在本身的vhost建立exchange/queue等。
  • Connection: publisher/consumer和broker之间的TCP链接。断开链接的操做只会在client端进行,Broker不会断开链接,除非出现网络故障或broker服务出现问题。
  • Channel: 若是每一次访问RabbitMQ都创建一个Connection,在消息量大的时候创建TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部创建的逻辑链接,若是应用程序支持多线程,一般每一个thread建立单独的channel进行通信,AMQP method包含了channel id帮助客户端和message broker识别channel,因此channel之间是彻底隔离的。Channel做为轻量级的Connection极大减小了操做系统创建TCP connection的开销。
  • Exchange: message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。经常使用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
  • Queue: 消息最终被送到这里等待consumer取走。一个message能够被同时拷贝到多个queue中。
  • Binding: exchange和queue之间的虚拟链接,binding中能够包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。

 

三:AMQP消息传送模型

  

  发布者(Publisher)发布消息(Message),传送到broker;多线程

  在broker中,消息被交换机(Exchange)根据路由规则,经过binding传送到不一样的队列;app

  最后 AMQP 代理会将消息投递给订阅了此队列的消费者(push API),或者消费者按照需求自行获取(pull API)。异步

 

四:Exchange分发策略

  Direct:当消息的routing key 与 binding 的 routing key 直接匹配,消息路由到该队列函数

  Topic:   当消息routing key 与 binding 的 routing key 符合通配符匹配,消息路由到该队列(请百度通配符匹配)spring-boot

  Headers:  当消息参数表中的头信息和值都与 binding参数表中匹配的话,消息路由到该队列

    Fanout: 任何消息直接匹配到全部队列上

 

五:RabbitMQ的使用

  一、SpringBoot集成RabbitMQ

  1)配置pom包:spring-boot-starter-amqp

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

  2)配置RabbitMQ服务器信息

  首先,须要一台主机安装并启动RabbitMQ。

  而后在项目中配置:

spring.application.name=应用名

spring.rabbitmq.host=RabbitMQ服务器的host
spring.rabbitmq.port=5672
spring.rabbitmq.username=RabbitMQ服务器的登陆账号
spring.rabbitmq.password=密码

  3)配置消息队列

@Configuration
public class RabbitConfig {

    @Bean
    public Queue Queue() {
        return new Queue("hello");
    }

}

  4)使用模版,实现消息生产者

@component
public class HelloSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hello " + new Date();
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("hello", context);//自动转换为消息对象并要求发送到hello队列
    }

}

  5)实现消息接收者——建立监听器,监听hello队列,一旦有消息则调用process函数进行处理

@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver  : " + hello);
    }

}

 

  二、Python使用RabbitMQ

  1)下载RabbitMQ并解压

  下载网址: http://www.rabbitmq.com/install-generic-unix.html 

  解压后,进入 sbin 目录, 运行 server。

  默认端口为5672。

  2)pip安装AMQP协议实现模块——pika

  3)消息生产者:

# -*- coding: utf-8 -*-

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='first', type='fanout')

channel.queue_declare(queue='hello')
channel.queue_bind(exchange='first', queue='hello')

channel.basic_publish(exchange='first', routing_key='', body='Hello World!')
  • 获取链接.
  • 从链接上获取一个 channel.
  • 声明一个 exchange . (只会建立一次)
  • 声明一个 queue . (只会建立一次)
  • 把 queue 绑定到 exchange 上.
  • 向指定的 exchange 发送一条消息.

 

  4)消息消费者

# -*- coding: utf-8 -*-

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):#消息处理函数
    print body

channel.basic_consume(callback, queue='hello', no_ack=True)
channel.start_consuming()
  • 获取链接.
  • 从链接上拿到 channel .
  • 声明须要的 queue .
  • 定义一个从指定 queue 获取消息的回调处理.
  • 开始接收消息.

 

  5)查看rabbitMQ的服务状态

  使用 rabbitmqctl 这个工具。例如:查看当前的队列状况

./rabbitmqctl list_queues

 

  

六:深刻

  • 持久化
  • 调度策略
  • 分配策略
  • 状态反馈
相关文章
相关标签/搜索