一文看懂Rabbitmq,从安装到实战演练

Rabbitmq的初步使用

随着微服务概念发展,大应用逐步拆分为小应用,提升开发效率,专门的人作专门的事情,逐渐的流行起来。git

在微服务上实现通讯的方式大部分是采用rpc方式,也有升级版本的grpc。github

还有另一种实现就是使用mq来进行解耦。spring

今天初识mq,快速入门先,准备一个环境实现案例,该文涉及如下内容:docker

  • 安装rabbitmq
  • mq能解决的问题
  • 实战演练

安装

rabbitmq的安装咱们采用docker的方式,docker方便咱们快速的实现rabbitmq的安装,不须要再对安装mq进行头疼。微信

docker 的两种方式并发

docker方式

//拉取mq镜像
docker pull rabbitmq
//启动mq
docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin df80af9ca0c9

复制代码

说明:app

  1. -d 后台运行容器;
  2. --name 指定容器名;
  3. -p 指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号);
  4. -v 映射目录或文件;
  5. --hostname 主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);
  6. -e 指定环境变量;(RABBITMQ_DEFAULT_VHOST:默认虚拟机名;RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的密码)

docker-compose 方式

version: "3"
services:
 rabbit:
 image: docker.infervision.com/library/rabbitmq:3-management
 ports:
 - "4369:4369"
 - "5671:5671"
 - "5672:5672"
 - "15671:15671"
 - "15672:15672"
 restart: always
 environment:
 - RABBITMQ_DEFAULT_USER=test
 - RABBITMQ_DEFAULT_PASS=test
 volumes:
 - /home/ruiqi/Desktop/disk/rabbitmq:/var/lib/rabbitmq
 container_name: rabbitmq

在该文件目录下执行:docker-compose up -d
复制代码

下载的rabbitmq内置管理界面,ip:15672 用户名与密码是咱们在启动是写入的。 异步

2019-06-12-17-31-25

mq能解决什么?

通俗的来讲,主要使用MQ来解决如下三个问题。ide

异步消息

在业务中,常常会遇到同时发送邮件,短信或者其余通知内容服务。业务初期,采用同步或者异步处理方式都须要等发送完毕后再返回给客户端。中间有必定的延迟spring-boot

2019-06-12-17-49-08

业务增加后,此方式系统性能就会形成很大的浪费。采用消息队列,将这几个服务进行解耦,只需将消息内容发送到消息队列中,下降用户的等待时间,体验效果比原先好不少。

2019-06-12-17-49-27

应用间解耦

同一个服务中可能须要其余服务的配合才能完成一项业务操做.仍是拿常见的购物案例来讲明。

在京东下单支付后,消息要通知到商家,邮件通知用户已经购买某商品。

若是这两种操做都采用同步执行,用户等待时间会变长。

采用mq方式以后,订单系统将消息持久化到mq上,返回给用户下单成功。

  • 商家接收到用户的下单信息,进行处理,若是有库存管理那么须要进行库存处理。
  • 邮件通知用户,告知用户下单成功。

mq保证消息的可靠投递,不会致使消息丢失,保证消息的高可靠性。若是库存出现失败也不会致使用户下单失败的状况,能够从新进行投递。

流量削峰

流量削峰,通常是同一时间涌进来不少请求,后台处理不过来。那么须要采用削峰方式来处理。

简单来讲是经过一个队列承接瞬时过来流量洪峰,在消费端平滑的将消息推送出去,若是消费者消费不及时能够将消息内容持久化在队列中,消息不存在丢失。

  1. 消费端不及时进行消费,还能够动态的扩增消费者数量,提升消费速度。
  2. 设定相关的阀值,多余的消息直接丢弃,告知用户秒杀失败等业务消息内容。

摘自简书

实战案例

本文是按照Java语言进行,使用Spring boot搭建,包管理工具Gradle。

导入rabbitmq jar包

compile("org.springframework.boot:spring-boot-starter-amqp:1.5.10.RELEASE")
复制代码

配置mq

yaml 文件配置

spring:
 rabbitmq:
 host: 192.168.110.5
 port: 5672
 username: tuixiang
 password: tuixiang
复制代码

准备好模板类,供后面直接使用

package com.infervision.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/** * @author: fruiqi * @date: 19-2-18 下午2:42 * @version:1.0 rabbit配置 **/
@Configuration
public class RabbitConfig {

    /** * 日志 **/
    private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);


    @Value("${spring.rabbitmq.username}")
    String userName;

    @Value("${spring.rabbitmq.password}")
    String userPassword;

    @Value("${spring.rabbitmq.host}")
    String host;

    @Value("${spring.rabbitmq.port}")
    Integer port;

    /** * 注入 * * @param * @return com.rabbitmq.client.Connection * @author fruiqi * @date 19-1-22 下午5:41 **/
    @Bean
    public ConnectionFactory getConnection() throws Exception {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUsername(userName);
        factory.setPassword(userPassword);
        factory.setHost(host);
        factory.setPort(port);
        return factory;
    }


    /** * 建立制定的 监听容器 * * @param queueName 监听的队列名字 * @param listenerChannel 设置是否将监听的频道 公开给已注册的 * @param PrefetchCount 告诉代理一次请求多少条消息过来 * @param ConcurrentConsumers 制定建立多少个并发的消费者数量 * @param acknowledgeMode 消息确认模式 * @param listener 监听器 * @return org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer **/
    public SimpleMessageListenerContainer setSimpleMessageListenerContainer(String queueName, boolean listenerChannel, int PrefetchCount, int ConcurrentConsumers, AcknowledgeMode acknowledgeMode, ChannelAwareMessageListener listener) throws Exception {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(getConnection());
        container.setQueueNames(queueName);
        container.setExposeListenerChannel(listenerChannel);
        container.setPrefetchCount(PrefetchCount);
        container.setConcurrentConsumers(ConcurrentConsumers);
        container.setAcknowledgeMode(acknowledgeMode);
        container.setMessageListener(listener);
        return container;
    }
}


package com.infervision.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/** * @author: fruiqi * @date: 19-2-18 下午2:51 * @version:1.0 **/
@Component
public class MsgSender {


    private static final Logger logger = LoggerFactory.getLogger(MsgSender.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /** * @param exchange 交换机名称 * @param routingKey 路由名称 * @param message 消息内容 * @return void * @description //TODO 发送消息到消息队列中 **/
    public void sendMsg(String exchange, String routingKey, Object message) {
        try {
            rabbitTemplate.convertAndSend(exchange,routingKey,message);
        }catch (Exception e){
            logger.error("[ERROR] send statistic message error ",e);
        }
    }

}
复制代码

实例连接mq

在使用rabbitmq 有的时候须要本身客户端建立queue,但有的时候并非本身建立,在rabbitmq页面上进行建立queue,其余消费者直接引用。

客户端建立mq

//初始化队列,若是队列已存在,则不做任何处理 若是有权限控制以下操做并不能实现
    @Bean
    public Queue dicomQueue() {
        return new Queue(getMacPreStr(DICOM_QUEUE_NAME));
    }

    //初始化交换机
    @Bean
    public Exchange topicExchange() {
        return ExchangeBuilder.topicExchange((DEFAULT_TOPIC_EXCHANGE).durable(true).build();
    }

    // 将队列与交换机按照路由规则进行绑定
    @Bean
    Binding bindingExchangeDicomQueue(Queue dicomQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(dicomQueue).to(topicExchange).with(DICOM_QUEUE_ROUTING_KEY);
    }

复制代码

使用

队列的使用:一个是发送,属于生产者;一个是监听,属于消费者.

生产者实现

在mq配置模板类中,专门实现了一个发送类,发送文件内容,直接调用发送接口便可。

@Autowired
    RabbitService rabbitService;

    /** * 练习 发送数据到 mq中 * 1. 发送的数据会到 mq中 * 2. 咱们配置的 listener 是用来消费消息的 * 3. 客户端配置 能够参考 RabbitClientConfig * @param name 名字编号 * @param vo 实体内容 * @return: com.infervision.model.NameVo */
    @ApiOperation(value = "增长name信息", notes = "实体信息")
    @PostMapping(value = "/{name}")
    @ApiImplicitParam(paramType = "query", name = "name", value = "用户名字", required = true, dataType = "string")
    public NameVo addNameVo(@RequestParam String name, @RequestBody NameVo vo) {
        rabbitService.sendMessage(DEFAULT_TOPIC_TEST_EXCHANGE, LABEL_FIEL_XML_QUEUE_ROUTING_KEY, JSON.toJSONString(vo));
        return vo;
    }


   @Service
public class RabbitServiceImpl implements RabbitService {

    @Autowired
    MsgSender msgSender;

    /** * 尝试发送 message 到mq中 * @param message * @return: void */
    @Override
    public void sendMessage(String exchange, String routingKey,String message) {
        msgSender.sendMsg(exchange, routingKey, message);
    }
}

复制代码

消费者实现

消费者实现有两种方式,一种经过注解的方式监听,一种是实现ChannelAwareMessageListener类来实现消费。

注解实现监听

//在方法上进行注入。配置工厂帮助提升单个消费者一次性消费的消息数量,设置多少个消费者,用来提升程序的性能
@RabbitListener(queues = "dicom.queue",containerFactory = "multipleConsumerContainerFactory")
    public void processDicomMessage(Message message, Channel channel) {
            logger.info(message);
    }

// 工厂能够在配置模板类中中配置好。
@Bean("multipleConsumerContainerFactory")
    public SimpleRabbitListenerContainerFactory multipleConsumerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setPrefetchCount(50);
        factory.setConcurrentConsumers(10);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        configurer.configure(factory, connectionFactory);
        return factory;
    }
复制代码

实现接口方式

/** * 建立监听器。 * @author fruiqi * @date 19-2-11 下午4:18 * @param labelStatisticsListener 监听器 * 调用咱们公用的方法 **/
    @Bean
    public SimpleMessageListenerContainer mqMessageContainer(LabelStatisticsListener labelStatisticsListener) throws Exception {
        SimpleMessageListenerContainer container = rabbitConfig.setSimpleMessageListenerContainer(“queue_name”,
                true, rabbitProperties.getMaximumDelivery(),
                rabbitProperties.getConsumer(), AcknowledgeMode.MANUAL, labelStatisticsListener);
        return container;
    }


@Component
public class LabelStatisticsListener implements ChannelAwareMessageListener {


    private static final Logger logger = LoggerFactory.getLogger(LabelStatisticsListener.class);

    /** * 处理传输过来的数据 * @param message 传送的消息内容 * @param channel 实现通道 * @return: void */
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        String mes = new String(message.getBody());
        logger.info("[INFO] message is {}",mes);

        // 手动应答 消息已消费
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

    }
}

复制代码

总结

以上内容就完成了rabbitmq 从搭建到使用所有的流程。固然里面还有更多的可让咱们去探讨,好比mq的队列模式,一个系统配置多个mq等等内容。敬请期待咱们下一篇mq系列内容。

你们在系统中使用过mq吗?大家使用的mq是什么样的?能够在留言区咱们一块儿探讨哦。

代码存放在:github中

·END·

路虽远,行则必至

本文原发于 同名微信公众号「胖琪的升级之路」,回复「1024」你懂得,给个赞呗。

微信ID:YoungRUIQ

公众号
相关文章
相关标签/搜索