RabbitMq底层原理分析

RabbitMq消息中间件介绍&为何要使用消息中间件&何时使用消息中间件

咱们用java来举例子, 打个比方咱们客户端发送一个下单请求给订单系统(order)订单系统发送了 一个请求给咱们的库存系统告诉他须要更改库存了,我已经下单了,这里,每个请求咱们均可以看做一条消息,可是咱们客户端须要等待订单系统告诉我这条消息的处理结果(我到底有没有下单成功) 可是 订单系统不须要知道库存系统这条消息的处理状况 由于不管你库存有没有改动成功, 我订单仍是下了, 由于是先下完了订单(下成功了) 才去更改库存, 库存若是更改出BUG了 那是库存系统的问题, 这个BUG不会影响订单系统。 若是这里你能理解的话,那么咱们就能发现咱们用户发送的这条消息(下订单),是须要同步的(我须要知道结果),订单发送给库存的消息,是能够异步的(我不想知道你库存到底改了没,我只是通知你我这边成功下了一个订单)那么若是咱们还按原来的方式去实现这个需求的话, 那么结果会是这样:html

那可能有同窗说了, 咱们订单系统开辟线程去访问库存系统不就行了吗?java

使用线程池解决 确实能够, 可是也有他的缺点, 那么 到底怎么来完美解决这个问题呢?redis

若是这张图能理解的话, 那么 这个消息系统, 就是咱们的消息中间件。spring

RabbitMq介绍&AMQP介绍

导语:咱们刚刚介绍了什么是消息中间件, 那么RabbitMq就是对于消息中间件的一种实现,市面上还有不少不少实现, 好比RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ等等 咱们这节主要讲RabbitMqsql

AMQP编程

这里引用百度的一句话 再加以个人理解: AMQP 其实和Http同样 都是一种协议, 只不过 Http是针对网络传输的, 而AMQP是基于消息队列的 AMQP 协议中的基本概念: •Broker: 接收和分发消息的应用,咱们在介绍消息中间件的时候所说的消息系统就是Message Broker。 •Virtual host: 出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,相似于网络中的namespace概念。当多个不一样的用户使用同一个RabbitMQ server提供的服务时,能够划分出多个vhost,每一个用户在本身的vhost建立exchange/queue等。 •Connection: publisher/consumer和broker之间的TCP链接。断开链接的操做只会在client端进行,Broker不会断开链接,除非出现网络故障或broker服务出现问题。 •Channel: 若是每一次访问RabbitMQ都创建一个Connection,在消息量大的时候创建TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部创建的逻辑链接,若是应用程序支持多线程,一般每一个thread建立单独的channel进行通信,AMQP method包含了channel id帮助客户端和message broker识别channel,因此channel之间是彻底隔离的。Channel做为轻量级的Connection极大减小了操做系统创建TCP connection的开销。 •Exchange: message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。经常使用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。 •Queue: 消息最终被送到这里等待consumer取走。一个message能够被同时拷贝到多个queue中。 •Binding: exchange和queue之间的虚拟链接,binding中能够包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。 Exchange的类型: direct : 这种类型的交换机的路由规则是根据一个routingKey的标识,交换机经过一个routingKey与队列绑定 ,在生产者生产消息的时候 指定一个routingKey 当绑定的队列的routingKey 与生产者发送的同样 那么交换机会吧这个消息发送给对应的队列。 fanout: 这种类型的交换机路由规则很简单,只要与他绑定了的队列, 他就会吧消息发送给对应队列(与routingKey不要紧) topic:(由于*在这个笔记软件里面是关键字,因此下面就用星替换掉了) 这种类型的交换机路由规则也是和routingKey有关 只不过 topic他能够根据:星,#( 星号表明过滤一单词,#表明过滤后面全部单词, 用.隔开)来识别routingKey 我打个比方 假设 我绑定的routingKey 有队列A和B A的routingKey是:星.user B的routingKey是: #.user 那么我生产一条消息routingKey 为: error.user 那么此时 2个队列都能接受到, 若是改成 topic.error.user 那么这时候 只有B能接受到了 headers: 这个类型的交换机不多用到,他的路由规则 与routingKey无关 而是经过判断header参数来识别的, 基本上没有应用场景,由于上面的三种类型已经能应付了。浏览器

RabbitMQ MQ: message Queue 顾名思义 消息队列, 队列你们都知道, 存放内容的一个东西, 存放的内容先进先出, 消息队列, 只是里面存放的内容是消息而已。 RabbitMq 是一个开源的 基于AMQP协议实现的一个完整的企业级消息中间件,服务端语言由Erlang(面向并发编程)语言编写 对于高并发的处理有着自然的优点,客户端支持很是多的语言: •Python •Java •Ruby •PHP •C# •JavaScript •Go •Elixir •Objective-C •Swift安全

RabbitMQ服务端部署网络

在介绍消息中间件的时候所提到的“消息系统” 即是咱们这节的主题:RabbitMq 如同redis同样 他也是采用c/s架构 由服务端 与客户端组成, 咱们如今咱们计算机上部署他的服务端 因为咱们刚刚介绍过了RabbitMQ服务端是由Erlang语言编写因此咱们这里先下载Erlang语言的环境 注意:若是是在官网下的RabbitMQ服务端的话 Erlang语言的版本不能过低, 否则要卸载掉旧的去装新的, 咱们这里下载OTP21.0版本直接从外网下载会很慢, 我这里直接贴上百度网盘的地址(由于这个东西仍是有点大的) pan.baidu.com/s/1pZJ8l2f3… 咱们再去官网下载 他的服务端安装包 www.rabbitmq.com/download.ht… 根据本身的系统选择下载便可 注意! 须要先下载Erlang再下载安装包安装, 否则安装RabbitMQ服务端的时候会提示你本地没有Erlang环境多线程

安装的话, 基本上就是默认的选项不用改 如何看RabbitMq安装完成了? 在系统-服务中找到以下便可:

包括启动 中止 重启 服务等

RabbitMQ安装会附带一个管理工具(方便咱们能直观的查看整个RabbitMQ的运行状态和详细数据等,有点像Navicat 对应Mysql的关系) 值得一提的是, 管理工具和RabbitMQ是两码事 但愿同窗们不要混稀了。 管理工具启动方式: 到大家安装的 RabbitMQ Server\rabbitmq_server-3.7.12\sbin 目录下面 执行一条cmd命令: rabbitmq-plugins enable rabbitmq_management 直接复制这条命令便可 , 固然 嫌每次都要去目录中去执行的麻烦的话, 能够配置一个环境变量 或者在咱们的开始菜单栏中找到这个:

输入完启动命令后 稍微等一下会有结果返回 而后能够打开浏览器 输入 http://127.0.0.1:15672 访问管理页面:

默认帐号密码都是 guest 即 username :guest password:guest 登陆进去以后会看到以下界面(由于我不当心装了2次RabbitMq 因此这里能看到都重复了, 大家本身那不会重复,而后咱们刚刚说了 管理工具和rabbitmq 是两码事 因此端口也就不同)

这个页面在笔记里面介绍起来可能比较复杂, 就不一一介绍了, 我这里讲个重点, 就是线上环境下必定要吧guest用户(固然 guest这个用户只能本机才能登录)删掉而且新加一个用户, 这里就演示一下这个功能 首先 点击admin页签, 在下面找到Add User

而后输入帐号 密码 确认密码 这个Tags实际上是一个用户权限标签, 关于他的介绍能够看官方介绍(点旁边那个小问号就行了,我这里直接翻译他的介绍)

填写完以后点击AddUser 就能够添加一个用户了, 添加完用户以后还要给这个用户添加对应的权限(注:Targ不等于权限) 好比说 我刚刚添加了一个jojo角色

点击这个jojo能够进去给他添加权限 这个权限能够是 Virtual host 级别的 也能够是交换机级别的 甚至是细化到某一个读写操做 我这里就给他添加一个Virtual host权限

这里 咱们给了他 testhost这个Virtual host的权限 正则匹配都是* 也就是全部权限 而后点击set添加完毕 那么管理页面 咱们就讲到这里

RabbitMq快速开始

由于咱们这里是用java来做为客户端, 咱们首先引入maven依赖: <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.1.2</version> </dependency> (注意的是, 我这里引入的是5.x的rabbitmq客户端版本, 那么咱们jdk的版本最好在8以上,反之, 这里就建议使用4.x的版本,这里仅仅讨论jdk8 其余的版本不作讨论) 首先 咱们编写一个链接的工具类: `package com.luban.util;

import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;

import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;

/**

  • 须要咨询java高级VIP课程的同窗能够木兰老师的QQ:2746251334

  • 须要往期视频的同窗能够加加安其拉老师的QQ:3164703201

  • author:鲁班学院-商鞅老师 */ public class ConnectionUtil {

    public static final String QUEUE_NAME = "testQueue";

    public static final String EXCHANGE_NAME = "exchange";

    public static Connection getConnection() throws Exception{ //建立一个链接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置rabbitmq 服务端所在地址 我这里在本地就是本地 connectionFactory.setHost("127.0.0.1"); //设置端口号,链接用户名,虚拟地址等 connectionFactory.setPort(5672); connectionFactory.setUsername("jojo"); connectionFactory.setPassword("jojo"); connectionFactory.setVirtualHost("testhost"); return connectionFactory.newConnection(); }

}`

而后咱们编写一个消费者(producer),和一个生产者(consumer): 生产者: `public class Consumer {

public static void sendByExchange(String message) throws Exception {

    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    //声明队列
    channel.queueDeclare(ConnectionUtil.QUEUE_NAME,true,false,false,null);
    // 声明exchange
    channel.exchangeDeclare(ConnectionUtil.EXCHANGE_NAME, "fanout");
    //交换机和队列绑定
    channel.queueBind(ConnectionUtil.QUEUE_NAME, ConnectionUtil.EXCHANGE_NAME, "");
    channel.basicPublish(ConnectionUtil.EXCHANGE_NAME, "", null, message.getBytes());
    System.out.println("发送的信息为:" + message);
    channel.close();
    connection.close();
}
复制代码

} `

消费者: `public class Producer {

public static void getMessage() throws Exception {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
复制代码

// channel.queueDeclare(ConnectionUtil.QUEUE_NAME,true,false,false,null); DefaultConsumer deliverCallback = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body, "UTF-8")); } }; channel.basicConsume(ConnectionUtil.QUEUE_NAME, deliverCallback); }

} `

这里, 咱们演示绑定fanout的类型的交换机, 因此不须要routingKey 就能够路由只须要绑定便可 (可能有同窗要问了, 若是没有绑定交换机怎么办呢? 没有绑定交换机的话, 消息会发给rabbitmq默认的交换机里面 默认的交换机隐式的绑定了全部的队列,默认的交换机类型是direct 路由建就是队列的名字) 基本上这样子的话就已经进行一个快速入门了, 因为咱们如今作项目基本上都是用spring boot(就算没用spring boot也用spring 吧) 因此后面咱们直接基于spring boot来说解(rabbitmq的特性,实战等)

相关文章
相关标签/搜索