消息队列(MQ)

消息队列主要解决问题python

主要解决应用耦合,异步处理,流量削锋等问题服务器

消息队列应用场景并发

1.应用耦合:多应用间经过消息队列对同一消息进行处理,避免调用接口失败致使整个过程失败异步

2.异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息, 相比串行处理,减小处理时间函数

3.限流削锋:普遍应用于秒杀或抢购活动中,避免流量过大致使应用系统挂 掉的状况spa

4.消息驱动系统:系统分为消息队列、消息生产者、消息消费者,生产者 负责产生消息,消费者(可能有多个)负责对消息进行处理code

经常使用的消息队列cdn

kafka、RabbitMQ、RocketMQ、ActiveMQ、ZeroMQ、MetaMQ,Redis也可实现队列功能中间件

消息队列的两种模式blog

1.点对点模式:一个生产者对应一个消费者,消息一旦被消费就不会存在于队列中;生产者和消费者彼此不依赖;接收者在成功接收消息以后需向队列应答成功,以便消息队列删除当前接收的消息

2.发布/订阅模式:发布者将消息发送到Topic,系统将这些消息传递给多个订阅者

特色:1.每一个消息能够有多个订阅者2.发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须建立一个订阅者以后,才能消费发布者的消息3.为了消费消息,订阅者须要提早订阅该角色主题,并保持在线运行

python中使用RabbitMQ

1.生产者

#-*- coding:utf-8 -*-
import pika
import sys
username = 'admin'   #指定远程rabbitmq的用户名密码
pwd = 'admin'
user_pwd = pika.PlainCredentials(username, pwd)
s_conn = pika.BlockingConnection(pika.ConnectionParameters('10.5.190.22', credentials=user_pwd))  #建立链接
chan = s_conn.channel()  #在链接上建立一个频道


chan.queue_declare(queue='uploadFileQueue',durable=True)   #声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另外一方能正常运行
chan.queue_bind(exchange='uploadExchange',     # 绑定队列
                queue="uploadFileQueue",
                routing_key="upload_key")

chan.basic_publish(exchange='uploadExchange',     #交换机 #uploadExchange upload_key
                   routing_key='upload_key',   #路由键,写明将消息发往哪一个队列,本例是将消息发往队列hello
                   body = 'hello'
                    
print("[生产者] send 'hello world")

# s_conn.close()#当生产者发送完消息后,可选择关闭链接
复制代码

2.消费者

#-*- coding:utf-8 -*-

import pika
username = 'admin'#指定远程rabbitmq的用户名密码
pwd = 'admin'
user_pwd = pika.PlainCredentials(username, pwd)
s_conn = pika.BlockingConnection(pika.ConnectionParameters('10.5.190.22', credentials=user_pwd))#建立链接
chan = s_conn.channel()#在链接上建立一个频道

chan.queue_declare(queue='uploadFileCallBackQueue',durable=True) #声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另外一方能正常运行
chan.queue_bind(exchange='uploadCallBackExchange',
                queue="uploadFileCallBackQueue",
                routing_key="upload_callback_key")

def callback(ch,method,properties,body): # 定义一个回调函数,用来接收生产者发送的消息
    print(type(body))
    print(body)
    print(body.decode("utf-8"))
    print("[消费者] recv %s" % body)

chan.basic_consume(callback,      # 调用回调函数,从队列里取消息
                   queue='uploadFileCallBackQueue', # 指定取消息的队列名
                   no_ack=True)   # 取完一条消息后,不给生产者发送确认消息,默认是False的,即 默认给rabbitmq发送一个收到消息的确认,通常默认便可
print('[消费者] waiting for msg .')
chan.start_consuming()            # 开始循环取消息
复制代码

备注:当消费的消息进行处理以后,须要将处理后的消息发送到对应的队列中,能够在消费者处理消息完成时候调用生产者的函数进行发送消息

RabbitMQ的了解

基于AMQP协议,支持多种语言,用来实现系统与系统之间,程序与程序之间进行通讯的中间件,总体来看是一个异步的过程,由生产者(Publish)来生产消息,这个消息会被先放到一个容器中,当知足必定条件时,这个消息会被消费者(Subscribe )拿走去消费。这个容器就是队列。生产者和消费者之间遵照的协议就是AMQP协议。其次还能够对消费者设置一个优先级(Priority),以及对消费者的请求进行限流,对负载进行有效均衡。

AMQP的核心是Producer(消息生产者)、Broker(消息队列的服务器实体)、Consumer(消息消费者)

Producer/Consumer概念比较好理解,无非就是一个生产者建立一个信息去由消费者去进行相关的逻辑处理。

roker消息队列的服务器,一个Broker能够包含多个VirtualHost(虚拟主机),主要起到了一个隔离的做用。 而一个VirtualHost又包括如下三部分

Exchange(交换机):由它按照某些规则 去决定消息最终路由到哪一个队列。

Binding:绑定,它的做用就是把 Exchange 和 Queue 按照路由规则绑定起来。若是没有bind,消息会直接被丢掉。

Queue:存储消息的地方,每一个消息都会被投入到一个或多个队列。。

相关文章
相关标签/搜索