.Net Core下使用RabbitMQ比较完备的两种方案(虽然代码有点惨淡,不过我会完善)

1、前言html

    上篇说给你们来写C#和Java的方案,最近工做也比较忙,迟到了一些,我先给你们补上C#的方案,另外若是没看我上篇博客的人最好看一下,不然你可能看的云里雾里的,这里我就不进行具体的方案画图了;传送门git

2、使用的插件github

    HangFireweb

    一个开源的.NET任务调度框架,最大特色在于内置提供集成化的控制台,方便后台查看及监控,支持多种存储方式;在方案中主要使用定时任务作补偿机制,后期可能会封装一些,能经过页面的形式直接添加任务;sql

   NLogapi

   日志记录框架,方案中使用记录日志,后期可能回集成多个日志框架;缓存

   Autofac网络

   依赖注入的框架,应该不用作过多介绍;框架

  SqlSugardom

  ORM框架,这个从刚开始我就在使用了,在如今公司没有推行起来,不过在上两家公司都留下的遗产,听说还用的能够,固然我仍是最佩服做者;

  Polly

  容错服务框架,相似于Java下的Hystrix,主要是为了解决分布式系统中,系统之间相互依赖,可能会由于多种因素致使服务不可用的而产生的一套框架,支持服务的超时重试、限流、熔断器等等;

  RabbitMQ.Client

  官方提供的C#链接RabbitMQ的SDK;

3、方案

  模拟一个简单订单下单的场景,没有进行具体的实现。同时建议下游服务不要写在web端,最好以服务的形式奔跑,代码中是Web端实现的,你们不要这么搞。总体上仍是实现了以前提到的两种方案:一是入库打标,二是延时队列(这块没有进行很好的测试,可是估计也没有很大的问题);固然也是有一些特色:RabbitMQ宕机状况下无需重启服务,网络异常的状况下也能够进行断线重连。接下来聊下代码和各方插件在系统中的具体应用:

  项目结构:

  

  RabbitMQExtensions:

  采用Autofac按照单例的形式注入,采用Polly进行断线重连,也开启了自身断线重连和心跳检测机制,配置方面采用最简单的URI规范进行配置,有兴趣参考下官方,总体上这块代码还相对比较规范,之后可能也不会有太多调整;

    /// <summary>
    /// rabbitmq持久化链接
    /// </summary>
    public interface IRabbitMQPersistentConnection
    {
        bool IsConnected { get; }

        bool TryConnect();

        IModel CreateModel();
    }
     /// <summary>
    /// rabbitmq持久化链接具体实现
    /// </summary>
    public class DefaultRabbitMQPersistentConnection : IRabbitMQPersistentConnection
    {
        private readonly IConnectionFactory connectionFactory;
        private readonly ILogger<DefaultRabbitMQPersistentConnection> logger;

        private IConnection connection;

        private const int RETTRYCOUNT = 6;

        private static readonly object lockObj = new object();
        public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<DefaultRabbitMQPersistentConnection> logger)
        {
            this.connectionFactory = connectionFactory;
            this.logger = logger;
        }

        public bool IsConnected
        {
            get
            {
                return connection != null && connection.IsOpen;
            }
        }

        public void Cleanup()
        {
            try
            {
                connection.Dispose();
                connection.Close();
                connection = null;

            }
            catch (IOException ex)
            {
                logger.LogCritical(ex.ToString());
            }
        }

        public IModel CreateModel()
        {
            if (!IsConnected)
            {
                connection.Close();
                throw new InvalidOperationException("链接不到rabbitmq");
            }
            return connection.CreateModel();
        }

        public bool TryConnect()
        {
            logger.LogInformation("RabbitMQ客户端尝试链接");

            lock (lockObj)
            {
                if (connection == null)
                {
                    var policy = RetryPolicy.Handle<SocketException>()
                        .Or<BrokerUnreachableException>()
                        .WaitAndRetry(RETTRYCOUNT, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>    
                        {
                            logger.LogWarning(ex.ToString());
                        });

                    policy.Execute(() =>
                    {
                        connection = connectionFactory.CreateConnection();
                    });
                }



                if (IsConnected)
                {
                    connection.ConnectionShutdown += OnConnectionShutdown;
                    connection.CallbackException += OnCallbackException;
                    connection.ConnectionBlocked += OnConnectionBlocked;

                    logger.LogInformation($"RabbitMQ{connection.Endpoint.HostName}获取了链接");

                    return true;
                }
                else
                {
                    logger.LogCritical("没法建立和打开RabbitMQ链接");

                    return false;
                }
            }
        }


        private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
        {

            logger.LogWarning("RabbitMQ链接异常,尝试重连...");

            Cleanup();
            TryConnect();
        }

        private void OnCallbackException(object sender, CallbackExceptionEventArgs e)
        {

            logger.LogWarning("RabbitMQ链接异常,尝试重连...");

            Cleanup();
            TryConnect();
        }

        private void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
        {

            logger.LogWarning("RabbitMQ链接异常,尝试重连...");

            Cleanup();
            TryConnect();
        }
    }
View Code

  OrderDal

  SqlSugar的一些简单封装,有些小特色:你们能够能够经过配置来实现读写分离,采用仓储设计。若是不太喜欢这么写,也能够参考杰哥的作法;

    //仓储设计
    public interface IBaseDal<T> where T:class,new()
    {
        DbSqlSugarClient DbContext { get; }

        IBaseDal<T> UserDb(string dbName);
        IInsertable<T> AsInsertable(T t);
        IInsertable<T> AsInsertable(T[] t);
        IInsertable<T> AsInsertable(List<T> t);
        IUpdateable<T> AsUpdateable(T t);
        IUpdateable<T> AsUpdateable(T[] t);
        IUpdateable<T> AsUpdateable(List<T> t);
        IDeleteable<T> AsDeleteable();

        List<T> GetList();
        Task<List<T>> GetListAnsync();

        List<T> GetList(Expression<Func<T,bool>> whereExpression);
        Task<List<T>> GetListAnsync(Expression<Func<T, bool>> whereExpression);

        List<T> GetList(Expression<Func<T, bool>> whereExpression, Expression<Func<T, object>> orderExpression, OrderByType orderByType = OrderByType.Desc);
        Task<List<T>> GetListAnsync(Expression<Func<T, bool>> whereExpression, Expression<Func<T, object>> orderExpression, OrderByType orderByType = OrderByType.Desc);

        List<T> GetPageList(Expression<Func<T, bool>> whereExpression, PageModel page);
        Task<List<T>> GetPageListAsync(Expression<Func<T, bool>> whereExpression, PageModel page);

        List<T> GetPageList(Expression<Func<T, bool>> whereExpression, PageModel page, Expression<Func<T, object>> orderByExpression = null, OrderByType orderByType = OrderByType.Asc);
        Task<List<T>> GetPageListAsync(Expression<Func<T, bool>> whereExpression, PageModel page, Expression<Func<T, object>> orderByExpression = null, OrderByType orderByType = OrderByType.Asc);

        int Count(Expression<Func<T, bool>> whereExpression);
        Task<int> CountAsync(Expression<Func<T, bool>> whereExpression);
        T GetById(dynamic id);
        T GetSingle(Expression<Func<T, bool>> whereExpression);
        Task<T> GetSingleAsync(Expression<Func<T, bool>> whereExpression);
        T GetFirst(Expression<Func<T, bool>> whereExpression);
        Task<T> GetFirstAsync(Expression<Func<T, bool>> whereExpression);

        bool IsAny(Expression<Func<T, bool>> whereExpression);
        Task<bool> IsAnyAsync(Expression<Func<T, bool>> whereExpression);

        bool Insert(T t);
        Task<bool> InsertAsync(T t);
        bool InsertRange(List<T> t);
        Task<bool> InsertRangeAsync(List<T> t);
        bool InsertRange(T[] t);
        Task<bool> InsertRangeAsync(T[] t);
        int InsertReturnIdentity(T t);
        Task<long> InsertReturnIdentityAsync(T t);


        bool Delete(Expression<Func<T, bool>> whereExpression);
        Task<bool> DeleteAsync(Expression<Func<T, bool>> whereExpression);
        bool Delete(T t);
        Task<bool> DeleteAsync(T t);
        bool DeleteById(dynamic id);
        Task<bool> DeleteByIdAsync(dynamic id);
        bool DeleteByIds(dynamic[] ids);
        Task<bool> DeleteByIdsAsync(dynamic[] ids);


        bool Update(Expression<Func<T, T>> columns, Expression<Func<T, bool>> whereExpression);
        Task<bool> UpdateAsync(Expression<Func<T, T>> columns, Expression<Func<T, bool>> whereExpression);
        bool Update(T t);
        Task<bool> UpdateAsync(T t);
        bool UpdateRange(T[] t);
        Task<bool> UpdateRangeAsync(T[] t);


        void BeginTran();
        void CommitTran();
        void RollbackTran();


    }
View Code

  OrderCommon

  定义全局异常的中间件,还有包含一些用到的实体等等,这部分代码还可优化拆分一下;

  OrderService

  生产者和消费者的具体实现,这块我还想在改造一番,将消费和业务分割开,如今写的很凌乱,不建议这么写,先把代码放出来,看看你们赞同不赞同个人这些用法,能够讨论,也欢迎争论,虽然这块代码写的很差,可是其实里面涉及一些RabbitMQ回调函数的用法,也是比较重要的,没有这些函数也就实现不了我上面说那两个特色;

//RabbitMQ宕机之后回调
//客户端这块你们不要采用递归调用恢复连接
//具体为何你们能够测试下,这里留点小疑问哈哈
connection.ConnectionShutdown += OnConnectionShutdown;

//消费端异常之后回调
consumerchannel.CallbackException += OnOnConsumerMessageAndWriteMessageLogException;

  Order

  具体的调用者,你们应该根据方法名字就能区分出我上面提到的两种方案的设计,总体的设计思路都是最终一致,延时队列发送消息这块最终也是能够经过定时任务来实现最终一致,实现方式有不少种,简单来讲下能够经过入库时生成的缓存机制,经过定时任务来进行补偿实现,这块我没有进行具体实现,有兴趣咱们能够探讨下这个方案;

    [Route("api/[controller]/[action]")]
    [ApiController]
    public class OrderController : ControllerBase
    {
        private readonly IBaseDal<OrderMessageLogEntity> orderBaseDal;

        private readonly IMessageService<OrderMessageLogEntity> messageService;

        private readonly IConsumerMessageService consumerMessageService;

        private const string EXCHANGENAME = "order";

        private const string QUEUENAME = "order";

        private const string ROUTINGKEY = "order";


        public OrderController(IBaseDal<OrderMessageLogEntity> orderBaseDal, IMessageService<OrderMessageLogEntity> messageService,IConsumerMessageService consumerMessageService)
        {
            this.orderBaseDal = orderBaseDal;
            this.messageService = messageService;
            this.consumerMessageService = consumerMessageService;
        }

        /// <summary>
        /// 建立订单
        /// </summary>
        /// <returns></returns>
        public ActionResult<bool> CreateOrder(long userId)
        {
            //建立订单成功
            OrderEntity orderEntity = new OrderEntity();
            Random random= new Random();
            orderEntity.OrderId = random.Next();
            orderEntity.OrderNo = random.Next();
            orderEntity.UserId = userId;
            orderEntity.OrderInfo = random.Next() + "详情";
            //bool isCreateOrderSuccress = orderService.CreateOrder(orderId);
            //if (!isCreateOrderSuccress)
            //{
            //    throw new Exception("建立订单失败");
            //}
            //建立订单成功之后开始入消息记录库
            //消息建议设计的冗余一些方便之后好查询
            //千万级之后连表太困难
            //建议冗余的信息有用户信息、订单信息、方便之后按照这个核对信息
            //消息表的建议是按照不一样的业务进行分表存储
            Random messageRandom = new Random();
            OrderMessageLogEntity orderMessageLog = new OrderMessageLogEntity();
            orderMessageLog.MessageId = messageRandom.Next();
            orderMessageLog.MessageInfo = orderEntity.OrderId+"订单信息";
            orderMessageLog.Status = (int)MessageStatusEnum.SENDING;
            orderMessageLog.OrderId = orderEntity.OrderId;
            orderMessageLog.UserId = orderEntity.UserId;
            orderMessageLog.CreateTime = DateTime.Now;
            orderMessageLog.UpdateTime = DateTime.Now;
            orderMessageLog.TryCount = 0;
            orderMessageLog.NextRetryTime = DateTime.Now.AddMinutes(5);
            //必须保证消息先落库
            bool isCreateOrderMessageLosSuccess = orderBaseDal.Insert(orderMessageLog);
            if (!isCreateOrderMessageLosSuccess)
                throw new Exception("消息入库异常");

            Message message = new Message();
            message.ExchangeName = EXCHANGENAME;
            message.QueueName = QUEUENAME;
            message.MessageId = orderMessageLog.MessageId;
            message.RoutingKey = ROUTINGKEY;
            message.Body = Encoding.UTF8.GetBytes(orderMessageLog.MessageInfo);


            //落库成功之后开始发送消息到MQ
            //这个地方采用最终一致而不去使用分布式事物最终一致
            messageService.SendMessage(message, orderMessageLog);



            return true;
        }


        /// <summary>
        /// 消费订单
        /// </summary>
        /// <returns></returns>
        public ActionResult<bool> ConsumerOrder()
        {
            Message message = new Message();
            message.ExchangeName = EXCHANGENAME;
            message.QueueName = QUEUENAME;
            message.RoutingKey = ROUTINGKEY;

            consumerMessageService.ConsumerMessage();

            return true;
        }



        /// <summary>
        /// 经过延时队列发送消息
        /// </summary>
        /// <param name="userId"></param>
        /// <returns></returns>
        public ActionResult<bool> CreateDelayCreateOrder(long userId)
        {
            //建立订单成功
            OrderEntity orderEntity = new OrderEntity();
            Random random = new Random();
            orderEntity.OrderId = random.Next();
            orderEntity.OrderNo = random.Next();
            orderEntity.UserId = userId;
            orderEntity.OrderInfo = random.Next() + "详情";
            //bool isCreateOrderSuccress = orderService.CreateOrder(orderId);
            //if (!isCreateOrderSuccress)
            //{
            //    throw new Exception("建立订单失败");
            //}
            //建立订单成功之后开始入消息记录库
            //消息建议设计的冗余一些方便之后好查询
            //千万级之后连表太困难
            //建议冗余的信息有用户信息、订单信息、方便之后按照这个核对信息
            //消息表的建议是按照不一样的业务进行分表存储
            Random messageRandom = new Random();
            OrderMessageLogEntity orderMessageLog = new OrderMessageLogEntity();
            orderMessageLog.MessageId = messageRandom.Next();
            orderMessageLog.MessageInfo = orderEntity.OrderId + "订单信息";
            orderMessageLog.Status = (int)MessageStatusEnum.SENDING;
            orderMessageLog.OrderId = orderEntity.OrderId;
            orderMessageLog.UserId = orderEntity.UserId;
            orderMessageLog.CreateTime = DateTime.Now;
            orderMessageLog.UpdateTime = DateTime.Now;
            orderMessageLog.TryCount = 0;
            orderMessageLog.NextRetryTime = DateTime.Now.AddMinutes(5);
            ////必须保证消息先落库
            //bool isCreateOrderMessageLosSuccess = orderBaseDal.Insert(orderMessageLog);
            //if (!isCreateOrderMessageLosSuccess)
            //    throw new Exception("消息入库异常");

            Message message = new Message();
            message.ExchangeName = EXCHANGENAME;
            message.QueueName = QUEUENAME;
            message.MessageId = orderMessageLog.MessageId;
            message.RoutingKey = ROUTINGKEY;
            message.Body = Encoding.UTF8.GetBytes(orderMessageLog.MessageInfo);

            //这里的设计是不进行落库
            //假如两条消息都失败必须借助定时任务去对比消息库和订单库的消息id而后进行再补发
            //剩下的只要有一条发送成功其实就能保证下游必然会消费调这条消息,排除下游消费异常的状况 这个地方我不在进行实现本身可脑补一下
            //开始发送消息到MQ
            messageService.SendMessage(message, orderMessageLog);

            //发送延时消息
            messageService.SendDelayMessage(message, orderMessageLog);

            return true;

        }

        /// <summary>
        /// 消费消息之后并入库
        /// </summary>
        /// <returns></returns>
        public ActionResult<bool> ConsumerOrderAndWirteMessageLog()
        {
            consumerMessageService.ConsumerMessageAndWriteMessageLog();

            return true;
        }


        /// <summary>
        /// 消费延时消息
        /// 进行二次检查核对
        /// </summary>
        /// <returns></returns>
        public ActionResult<bool> ConsumerDelayOrder()
        {
            consumerMessageService.ConsumerDelayMessage();

            return true;
        }
    }
View Code

  HangfireExtensions

  Hangfire定时框架,采用Mysql做为持久层的存储,写的也比较清晰,后期就是针对这些进行扩展,实如今界面就能添加定时任务;

4、结束

  生产端和消费端这段代码写的凌乱,但愿你们不要介意这一点,是有缘由的,这里我就不说了。但愿你们看到闪光点,不要在一点上纠结;下次会加入Elasticsearch和监控部分的时候我会把这块代码改掉,还你们一片整洁的世界;

  Github地址:https://github.com/wangtongzhou520/rabbitmq.git  有什么问题你们能够问我;

  欢迎你们加群438836709!欢迎你们关注我!

  

相关文章
相关标签/搜索