基础拾遗----RabbitMQ

基础拾遗

基础拾遗------特性详解html

基础拾遗------webservice详解java

基础拾遗------redis详解git

基础拾遗------反射详解github

基础拾遗------委托详解web

基础拾遗------接口详解redis

基础拾遗------泛型详解sql

基础拾遗-----依赖注入数据库

基础拾遗-----数据注解与验证c#

基础拾遗-----mongoDB操做安全

基础拾遗----RabbitMQ

前言

  消息队列,在高并发环境下,因为来不及同步处理,请求每每会发生堵塞,好比说双十一不少人进行下单,购买物品这是对于数据的操做是很是之大的,不论是是insert仍是update是否是都有及时操做数据库,那么就有可能形成数据库思索移除什么堆积阻塞。那么咱们这时是否是加入异步,nosql是否是能减轻其压力,那么这中间剑气的桥梁就是mq了,固然她的使用场景有不少,咱们接下来把社么是消息队列了解清楚它是怎么一回事以后,但愿你们能在本身的项目中灵活应用便可。

消息队列(MQ)

咱们先从图文上说一下它的使用场景,异步处理,应用解耦,流量削锋消息通信四个场景。由于之前开发过商城因此就如下载订单来叙述一下,他的适应场景吧。

异步处理

好比咱们下载订单后发送邮件与短信给使用者(简单举例通常不会哈)。那么咱们在写程序通常怎么处理呢?(1)把下单信息存入数据库中,调用邮件,短信接口,发送(并行发送或者一个一个发送),返回界面。可是咱们计算一下若是每一个操做时间为30ms那么最少也须要60ms,多的状况是90ms,

那么若是咱们加入消息队列将是一个怎样的状况呢,咱们先把下单信息存入数据库,同时把信息放到消息队列。而后就不用管它了。这样的话所用时间就是30ms+1ms(存消息队列)。其实放消息队列中仍是要管的的,但那是消费者的事和下单这个生产这无关。

应用解耦

  仍是商城下载订单的问题,当咱们商城下载订单,而后公司内部erp中库存管理相应库存进行同步。通常咱们怎么处理,下载完订单,调用erp系统,而后处理erp数据,接着把erp数据库中的信息进行同步到商城,这个时候处理上面提到的效率,还有一个问题,须要解决:若是两个系统不能同时访问,你会怎么作。那么咱们就要对两个系统进行解耦了对不对。这个时候消息队列就有了用武之地。以下图:其实消息队列在这个功能下,咱们的erp系统也有写入的时候,在这再也不累述业务,你们了解消息队列的用途便可。

流量削锋

作过商城的应该都会遇到这个问题,当举行活动是拥挤大量的用户,可能会是系统崩溃,这时候流量控制,和异常处理是一件特别重要的工做。固然请不要说在这其余方法,咱们不对其进行讨论,咱们尽对消息队列的使用作简单介绍。

消息通信

消息通信是指,消息队列通常都内置了高效的通讯机制,所以也能够用在纯的消息通信。好比实现点对点消息队列,或者聊天室等

以上咱们大体说了一下他的使用场景,那么不知道你们有没有了解到它究竟是个什么东西?

其实吧消息队列就是一个生产者,把相应消息(对象)放到消息队列(中间件中),而后它就什么都不用管了,接下来消费者(或者叫订阅者)去消息队列中间件中去获取订阅的信息,它本身再去处理。能解决的问题我们从上面的场景应该已经了解到了,解耦,提升效率。那么重点来了消息队列中间件又是什么呢?它都有哪些,又是怎么实现的呢?下面咱们就来了解其中的一个中间件RabbitMq。

RabbitMq

  你们大体知道什么是消息队列了,那么它的实现是什么样的呢?如今基本上也知道它实现重要的一环是消息对立中间件,rabbitmq,就是其中之一,其中还包括:Active MQ,Rocket Mq,Kafka,Zero MQ甚至也有人用redis来实现。

从个人角度来讲我去了解了两个AcctionMQ与RabbitMq这两种最终选择了它,也简单作了相应的封装,来我先来介绍一下RabbitMq.

  RabbitMQ是一个消息代理 - 一个消息系统的媒介。它能够为你的应用提供一个通用的消息发送和接收平台,而且保证消息在传输过程当中的安全。它提供的内部机制包括持久性机制、投递确认、发布者证明和高可用性机制,多协议,集群,联合咱们能够在实现的过程当中针对于性能与可靠性进行相应权衡。

  看一下:rabbitmq可视化工具以下(此可视化web的操做请你们自行查询):

其实消息队列的协议是AMQP,有不少对此的介绍在这再也不累述。结果上面的了解咱们大体知道它是个什么东西,不过咱们也要在此提一下,几个概念。消息、队列、路由(包括点对点和发布/订阅),生产者,消费者,具体解释我以为不须要了,就是你理解的字面意思。

  其中队列咱们通常用P来表示,消费者通常用C,队列(存消息的集合)用q。路由是R.多个消费者能够访问多个q。接下来开始咱们的实现了。

RabbitMq的代码实现

RabbitMq链接

首先看一下配置文件信息:

  <appSettings>
         <!--rabbitMQ-->
    <add key="serveraddress" value="amqp://192.168.0.76:5672/"/>
    <add key="virtualhost" value="erpadminvirtualhost"/>
    <add key="username" value="tx_junpin"/>
    <add key="password" value="abc.1234%"/>

  以上分别是访问服务地址,虚拟地址(可在可视化上手动添加,记得要加一条数据进去,而后删除,比如初始haunted同样),用户,密码。其中web访问地址通常为端口后改成“15672”.

链接关键数据准备好以后就是c# 中代码的实现了

 private RabbitConsumerConfig RBGetinfo;
        private ConnectionFactory cf = new ConnectionFactory();
        private IConnection conn; //创建联接

        /// <summary>
        /// 初始化Rabbit链接
        /// </summary>
        /// <param name="rbinfo"></param>
        public RabbitConsumer(RabbitConsumerConfig rbinfo)
        {
            RBGetinfo = rbinfo;
            cf = new ConnectionFactory()
            {
                UserName = RBGetinfo.UserName,
                Password = RBGetinfo.Password,
                VirtualHost = RBGetinfo.VirtualHost,
                RequestedHeartbeat = 0,
                Uri = RBGetinfo.ServerAddress
            };
            conn = cf.CreateConnection();

        }

以上ConnectionFactory 内部为中间件提供的链接工厂。方便与AMQP代理相关联的Connection。用兴趣的小伙伴请F12去看代码吧。

调用代码封装

   /// <summary>
        /// 队列出列的方法,传入处理队列中body的方法,并传入队列名称
        /// </summary>
        /// <param name="messageProcessAction">要执行的方法(委托)</param>
        /// <param name="queuename">队列名称</param>
     /// <param name="count">获取数据条数</param>
        public void ConsumeMessage(Action<string> messageProcessAction, string queuename, ushort count)
        {
            if (string.IsNullOrEmpty(queuename))
            {
                throw new ArgumentNullException("queuename");
            }
            CheckConn();
            using (IModel ch = conn.CreateModel())
            {
                //第二种取法QueueingBasicConsumer基于订阅模式
                QueueingBasicConsumer consumer = new QueueingBasicConsumer(ch);
                ch.BasicQos(0, count, true);
                ch.BasicConsume(queuename, false, consumer);
                while (true)
                {
                    string message = "";
                    try
                    {
                        BasicDeliverEventArgs e = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                        IBasicProperties props = e.BasicProperties;
                        byte[] body = e.Body;
                        message = System.Text.Encoding.UTF8.GetString(body);
                        messageProcessAction.Invoke(System.Text.Encoding.UTF8.GetString(body).Replace("\0\0\0body\0\n", "").Replace("\0", "").ToString());
                        ch.BasicAck(e.DeliveryTag, false);
                    }
                    catch (Exception ex)
                    {
                        throw new RabbitException() { InternalException = ex, QueueName = queuename, RabbitInfo = RBGetinfo.ToString(), CurrentMessage = message };
                    }
                }
            }
        }
View Code

其中 CheckConn()判断是否链接若是没链接继续链接诶:

      private void CheckConn()
        {
            if (RBGetinfo != null && !IsOpen)
            {
                cf = new ConnectionFactory()
                {
                    UserName = RBGetinfo.UserName,
                    Password = RBGetinfo.Password,
                    VirtualHost = RBGetinfo.VirtualHost,
                    RequestedHeartbeat = 0,
                    Uri = RBGetinfo.ServerAddress
                };
                conn = cf.CreateConnection();
            }
        }
View Code

可能你们看到注释了,是的,RabbitMQ Consumer 获取消息有两种方式(poll、subscribe) 。-----订阅与轮询。咱们用的是订阅模式。写到者忽然件想有时间仍是要把上面提到的那个几个概念再梳理一下吧。

其中委托调用的方法:

    public void StockTBCExecute(string body)
        {
            logger.Error("StockTBCExecute" + body);
        }

你有可能会问。我可不能够定义委托方法为多个参数?我只能说,你看一下代码:

 byte[] body = e.Body;
  message = System.Text.Encoding.UTF8.GetString(body);
   messageProcessAction.Invoke(System.Text.Encoding.UTF8.GetString(body).Replace("\0\0\0body\0\n", "").Replace("\0", "").ToString());

至于可否扩展大家本身去研究吧。

向中间件插入数据

  public class RabbitMQManager
    {
        private static readonly string _serverAddress;
        private static readonly string _virtualHost;
        private static readonly string _userName;
        private static readonly string _password;
        private static readonly ILog _logger = LogManager.GetLogger(typeof(RabbitMQManager));
        private static RabbitProducer _rabbitProducer;

        static RabbitMQManager()
        {
            _serverAddress = ConfigurationManager.AppSettings["serveraddress"];
            _virtualHost = ConfigurationManager.AppSettings["virtualhost"];
            _userName = ConfigurationManager.AppSettings["username"];
            _password = ConfigurationManager.AppSettings["password"];

        }

        /// <summary>
        /// 交换连接信息
        /// </summary>
        /// <param name="routingKey">路由关键字</param>
        /// <param name="queueName">队列名称</param>
        /// <param name="message">消息内容</param>
        public static void SendRabbitMQ(string routingKey, string queueName, string message)
        {
            RabbitProducerConfig _rabbitConfig = new RabbitProducerConfig()
            {
                ServerAddress = _serverAddress,
                VirtualHost = _virtualHost,
                UserName = _userName,
                Password = _password,
                Exchange = "erp.service",
                ExchangeType = "direct",
                RoutingKey = routingKey
            };
            if (_rabbitProducer == null || !_rabbitProducer.IsOpen)
            {
                _rabbitProducer = new RabbitProducer(_rabbitConfig);
            }
            try
            {
                _rabbitProducer.ProduceMessage(message, queueName);
            }
            catch (Exception ex)
            {
                _logger.Error(ex);
            }
            finally
            {
                _rabbitProducer.Close();
            }
        }

      
    }
View Code

以上代码好像也没有什么好解释的,这里面用到的路由与于队列参数,基本上我使用的一个队列会对应一个路由,可是 rabbitmq并不是只有这种方式。

那么就在这多说一点吧。

RabbitMQ三种路由方式

Direct Exchange(直接路由)

任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue.(我封装的方法是这种)

1 .通常状况可使用rabbitMQ自带的Exchange:"(该Exchange的名字为空字符串,下文称其为 default Exchange)。
2 .这种模式下不须要将Exchange进行任何绑定(binding)操做
3 .消息传递时须要一个“RouteKey”,能够简单的理解为要发送到的队列名字。
4 .若是vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。

Fanout Exchange(广播路由)

任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的全部Queue上。

1 .能够理解为路由表的模式
2 .这种模式不须要RouteKey
3 .这种模式须要提早将Exchange与Queue进行绑定,一个Exchange能够绑定多个Queue,一个Queue能够同多个Exchange进行绑定。
4 .若是接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃

Topic Exchange(主题订阅模式路由)

任何发送到Topic Exchange的消息都会被转发到全部关心RouteKey中指定话题的Queue上

1 .这种模式较为复杂,简单来讲,就是每一个队列都有其关心的主题,全部的消息都带有一个“标题”(RouteKey),Exchange会将消息转发到全部关注主题能与RouteKey模糊匹配的队列。
2 .这种模式须要RouteKey,也许要提早绑定Exchange与Queue。
3 .在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心全部涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。
4 .“#”表示 0 个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,没法与“log.warn.timeout”匹配;可是“log.#”能与上述二者匹配。
5 .一样,若是Exchange没有发现可以与RouteKey匹配的Queue,则会抛弃此消息。

最后的最后源码

源码:https://github.com/kmonkey9006/RabbitMQ

无论你是否了解上面我说的,你能够直接用下面的方法来使用我封装的这个类库:

插入指定队列一条数据:

   RabbitMQManager.SendRabbitMQ(RoutKey.RoutKey_stock_eshop, Queuen.Queuen_Stock_Eshop, "0108ZLY036");

获取队列中的数据(先进先出):

 public void Execute1()
        {
            while (true)
            {
                try
                {
                    if (rc1 == null || !rc1.IsOpen)
                    {
                        rc1 = new RabbitConsumer(rcc);
                    }
                    rc1.ConsumeMessage(StockEshopExecute, RabbitMQ.RabbitMqConst.Queuen.Queuen_Stock_Eshop, 1);
                }
                catch (Exception ex)
                {
                    logger.ErrorFormat("Execute1,异常:{0}", ex.Message);
                }
            }
        }

总结:

大体讲的是怎么在项目中使用,其中有不少细节与须要注意的东西详细阐述,你们可本身详细的去了解,针对于代码,我已经传到github上去,若是有什么问题你们能够给我提出来,有什么须要讨论的,请在公告来中找到QQ与我联系。

相关文章
相关标签/搜索