RabbitMQ跟CAP简单入门

以前待了7年的公司倒闭,终于找了一份真正的程序员工做,总算体验了996的感受,如今项目接近尾声了,总算有点时间下写博客了。找工做时看到中高级工程师都要求熟练\精通掌握RabbitMQ跟CAP,而作为中级开发工程师的我意识到,不得不学,这几天找了时间学习了下。程序员

如下个人理解说法不知规不规范,只是用我最通俗的理解写出来数据库

RabbitMQ是一种底层队列的实现(Kafka也是一种队列),CAP提供了一种通用队列发布、订阅使用方法。服务器

能够理解为SqlServer、MySql跟EF的关系吧(EF比做为CAP),你不经过EF,也能够用SqlClient相关类来使用SqlServer,但EF提供一种通用的代码使用方法,一样的代码能够同时用于SqlServer、MySql,代码使用者不用关心我底层是用了SqlServer仍是MySql。分布式

RabbitMQ简单介绍微服务

我先说下RabbitMQ原生的使用方法,而后再说下怎么跟CAP结合性能

说到队列,先理解下如下几个概念学习

  • 生产者:也能够说是发布者,主要是发布消息,发送给交换器;
  • 消费者:也能够说是订阅者,从队列中订阅消息进行处理并返回应答;
  • 交换器:可能链接多个队列,将生产者发布的消息发送到队列中;
  • 队列:存放生产者产生的消息,供消费者进行订阅处理;

消息从发布到订阅的流程步骤:测试

生产者发布消息给交换器(传递一个key值),交换器在它绑定的队列中根据key值及交换器模式找到匹配的队列发送消息,订阅了此队列的消费者就能够获取消息进行处理,并返回应答;spa

交换器模式code

  • direct 消息发送到RouteKey彻底匹配的队列中
  • fanout 消息转发到交换器绑定的全部队列中
  • topic 消息发送到RouteKey模糊匹配的队列中
  • header 会用headers属性来进行匹配,性能最差(实际使用中不多)

topic匹配规则:队列的key为TestRouteKey.#,能够匹配到 TestRouteKey.A.B,队列的key为TestRouteKey.*,能够匹配到TestRouteKey.A

如下是须要注意理解的点

  • 生产者也能够直接发布消息到队列中;
  • 若是交换器没有绑定任何队列,那发布的消息将直接丢弃;
  • 一个消息只能被一个消费者获取,要实现消息同时被多个消费者获取,要使用交换器绑定多个队列;
  • 消费者获取消息后,若是处理过程当中失败了没有返回应答,那消息会在队列中从新发送;

如下演示,能够建立两个控制台程序,而后在Main里面写相关代码进行测试

生产者发布消息代码

安装包 RabbitMQ.Client

//链接工厂
var factory = new ConnectionFactory(){
    UserName="",
    Password="",
    HostName="",
    Port=0
};
//建立链接
var connection = factory.CreateConnection();
//建立通道
var channel = connection.CreateModel();
//声明交换器,模式为direct
channel.ExchangeDeclare("exchangeName","direct");
//声明队列
channel.QueueDeclare("queueName",durable:true);
//将交换器跟队列进行绑定
channel.QueueBind("queueName","exchangeName","routeKey",null);
//发布消息
channel.BasicPublish("exchangeName","routeKey",null,Encoding.UTF8.GetBytes("hello world"));

channel.Close();
connection.Close();

 

消费者订阅消息

var factory = new ConnectionFactory
{
    UserName = "",
    Password = "",
    HostName = "",
};
//建立个链接
var connection = factory.CreateConnection();
//建立个通道
var channel = connection.CreateModel();
var consumer = new EventingBasicConsumer(channel);
//定义事件消费者,及消费接收事件(返回应答)
consumer.Received += (o, e) =>
{
    var message = Encoding.UTF8.GetString(e.Body.ToArray());
    Console.WriteLine($"收到消息:{message}");
    channel.BasicAck(e.DeliveryTag, false);

};
//启动消费者,第二个参数是表明是否自动应答,false就得手动调用BasicAck方法
channel.BasicConsume("hello", false, consumer);

Console.WriteLine("消费者已启动");
Console.ReadKey();
channel.Close();
connection.Close();

 

CAP简单介绍

上面简单的介绍完RabbitMQ使用方法,下面再来简单说下CAP是干什么的

CAP可用于微服务分布式事务解决方案,就是能够搭建不一样站点,使用CAP,链接同一个RabbitMQ,部署在不一样的服务器上,实现分布式部署。

那要实现CAP,须要一个数据库来记录事件,须要一个队列来存放事件消息。

CAP更详情的文档可查看它的官网,重点有中文的 http://cap.dotnetcore.xyz/

建立一个WebApi初始项目来演示一下。

安装包 DotNetCore.CAP

安装包DotNetCore.CAP.SqlServer,这是提供Sqlserver来记录事件的包

安装包 DotNetCore.CAP.RabbitMQ,这是提供RabbitMQ来存放事件消息的包

安装包 DotNetCore.CAP.Dashboard,这是提供一个Web管理后台可查看发布、订阅消息状况

在Startup.cs的ConfigureServices方法中注入

services.AddCap(o=>{
    o.UseSqlServer("");
    o.UseRabbitMQ(mq => {
        mq.HostName = "";//RabbitMQ服务器地址
        mq.Port=5672;
        mq.UserName = "admin";
        mq.Password = "admin";
    });
    o.UseDashboard(); //添加监控仪表盘,经过http://localhost/cap访问

    o.FailedRetryInterval = 30;//失败后的重拾间隔,默认60秒
    o.FailedRetryCount = 10;//失败后的重试次数,默认50次;在FailedRetryInterval默认60秒的状况下,即默认重试50*60秒(50分钟)以后放弃失败重试
    o.SucceedMessageExpiredAfter = 60 * 60; //设置成功信息的删除时间默认24*3600秒
});

 

而后在Controllers目录下建立一个测试控制器

[ApiController]
[Route("[controller]/[action]")]
public class TestController : ControllerBase
{
    private readonly ICapPublisher _capPublisher;
    public TestController(ICapPublisher capPublisher)
    {

        _capPublisher = capPublisher;
    }
    [HttpPost]
    public void Test1()
    {
        //发布消息,消息被订阅处理后,会回调到Test.Callback
        _capPublisher.Publish<string>("Test.Event", "Hello,World","Test.Callback");
    }
    [NonAction]
    [CapSubscribe("Test.Event")] //订阅Test.Event事件
    public string Test2(string message)
    {
        //进行订阅消息处理
        Console.WriteLine(message);
        return "OK";
    }

    [NonAction]
    [CapSubscribe("Test.Callback")]
    public void TestCallback(string result)
    {
        //发布消息完成后的回调
        Console.WriteLine(result);
    }
}

 

好了,上面就简单的介绍了RabbitMQ跟CAP的使用方法,原本还在想这些东西适用于哪些场景,而后今天项目上线后出现问题了,里面涉及到两个系统的调用,一个系统A由于接口被频繁地调用超时,致使另外一个系统B一直显示出错,我就发现这个场景就很适合用这个CAP了。

系统A的崩溃不该影响到系统B,而系统A崩溃时也能够自动进行重试,当系统B发布消息后,也不用等待系统A,显示处理中,等系统A处理成功后再通知系统B,B再显示成功就能够了。

相关文章
相关标签/搜索