c#经过Redis实现轻量级消息组件

最近在开发一个轻量级ASP.NET MVC开发框架,须要加入日志记录,邮件发送,短信发送等功能,为了保持模块的独立性,因此须要经过消息通讯的方式进行处理,为了保持框架在部署,使用,二次开发过程当中的简易便捷性,因此没有选择传统的MQ,而是基于Redis的订阅发布实现一个系统内部消息组件,话很少说,上码!html

数据结构定义

消息实体包含几个部分,订阅通道名称,信息头,信息体,信息差别化额外信息字典,信息头主要包含消息标识,消息日期,信息体包含信息内容,信息实体类型等git

public class Message
    {
        public string MessageChannel { set; get; }
        public MessageHead @MessageHead { set; get; }
        public MessageBody @MessageBody { set; get; }

        [JsonExtensionData]
        public Dictionary<string,Object> @MessageExtra { set; get; }

        public Message()
        {

        }

        public void AddExtra(string Name, string Value)
        {
            if (@MessageExtra == null)
            {
                @MessageExtra = new Dictionary<string, object>();
            }
            @MessageExtra.Add(Name, Value);
        }

        public Object GetExtra(string Name)
        {
            return @MessageExtra[Name];
        }
    }

    public class MessageHead
    {
        public string MessageID { set; get; }
        public DateTime MessageDate { set; get; }

        public MessageHead()
        {
            MessageID = CommonUtil.CreateCommonGuid();
            MessageDate = DateTime.Now;
        }
    }

    public class MessageBody
    {
        public string MessageJsonContent { set; get; }
        public Type MessageMapperType { set; get; }
    }

注:由于消息订阅发布传递过程当中,我是经过Json序列化传输的,使用过程当中可能须要一些额外的键值对信息,这里在对象中定义的是Dictinary对象,可是Dictinary自己是不支持序列化的,因此须要加上注解JsonExtensionData数据库

订阅通道声明

咱们须要达到的效果是,在系统启动时,全部消息通道能够根据系统中的应用自动订阅,这里就须要一个注解来标识咱们的订阅通道接收消息的实现类数据结构

[AttributeUsage(AttributeTargets.Class)]
    public class MessageChanelAttribute : Attribute
    {
        private string _ChannleName;
        public string ChannelName
        {
            get
            {
                return this._ChannleName;
            }
            set
            {
                this._ChannleName = value;
            }

        }
    }

消息的个性化策略处理

Redis的三方库我这里使用的是StackExchange.Redis.dll,在消息订阅时,须要为Channel指定接收到消息时的处理委托,咱们在自动订阅的过程当中确定也要收集好各种消息处理类并与Channel一一对应,这时候咱们就须要一个基类FastDefaultMessageHandler,咱们的具体的消息处理类继承自FastDefaultMessageHandler,重写处理方法便可app

[Component]
    [MessageChanelAttribute(ChannelName = "DefaultMessage")]
    public class FastDefaultMessageHandler : IFastMessageHandle
    {
        [AutoWired]
        public DBUtil @DBUtil;

        public void HandleMessage(RedisChannel ChannelName, RedisValue Message)
        {
            FastExecutor.Message.Design.Message Entity = JsonConvert.DeserializeObject<FastExecutor.Message.Design.Message>(Message);
            try
            {
                if (!CheckMessageIsConsume(Entity))
                {
                    this.CustomHandle(Entity);
                }
            }
            catch (Exception e)
            {
                StringBuilder ExceptionLog = new StringBuilder();
                ExceptionLog.AppendFormat("异常Message所属Channel:{0}", Entity.MessageChannel + Environment.NewLine);
                ExceptionLog.AppendFormat("异常Message插入时间:{0}", Entity.MessageHead.MessageDate.ToString() + Environment.NewLine);
                ExceptionLog.AppendFormat("异常Message内容:{0}", Message + Environment.NewLine);
                ExceptionLog.AppendFormat("异常信息:{0}", e.Message + Environment.NewLine);
                LogUtil.WriteLog("Logs/MessageErrorLog", "log_", ExceptionLog.ToString() + Environment.NewLine);
                ExceptionLog.AppendFormat("========================================================================================================================================================================" + Environment.NewLine);
                MessageACK.MoveMessageToExceptionChannel(Entity.MessageChannel, Entity);
            }
            finally
            {
                MessageACK.ConfirmMessageFinish(Entity.MessageChannel, Entity.MessageHead.MessageID);
            }

        }

        public virtual void CustomHandle(FastExecutor.Message.Design.Message @Message)
        {

        }

        public virtual bool CheckMessageIsConsume(FastExecutor.Message.Design.Message @Message)
        {
            return false;
        }
    }

其中的HandleMessage方法就是咱们在订阅Channel时对应的委托,会调用类中的CustomHandle的虚方法,子类继承重写该方法就会基于多态进行策略调用,CheckMessageIsConsume方法是用于确认消息是否重复消费的,也能够被重写,下面看一个访问日志类的实例,使用MessageChanelAttribute标注声明该实现类须要订阅发布的Channel名称为Visit,CustomHandle方法中实现了插入数据库操做,CheckMessageIsConsume方法判断该条日志数据是否已消费(已经存在于数据库)框架

[MessageChanelAttribute(ChannelName = "Visit")]
    public class VisitLog : FastDefaultMessageHandler
    {
        public override void CustomHandle(Message.Design.Message Message)
        {
            Frame_VisitLog LogEntity = JsonConvert.DeserializeObject<Frame_VisitLog>(Message.MessageBody.MessageJsonContent);
            @DBUtil.Insert(LogEntity);
            base.CustomHandle(Message);
        }

        public override bool CheckMessageIsConsume(Message.Design.Message Message)
        {
            Frame_VisitLog LogEntity = JsonConvert.DeserializeObject<Frame_VisitLog>(Message.MessageBody.MessageJsonContent);
            DBRow Row = new DBRow("Frame_VisitLog", "RowGuid", LogEntity.RowGuid);
            if (Row.IsExist())
            {
                return true;
            }
            else
            {
                return false;
            }
        }
    }

消息自动订阅

咱们但愿系统在启动时就寻找出定义好Channel和实现类,自动实现订阅,这里就须要用到IOC容器,启动系统时将全部的消息处理类放入容器中,在自动订阅时所有取出来,根据消息处理类中声明的Channel名称进行自动订阅ide

public void Init()
        {
            List<Type> HandlerTypeList = InjectUtil.Container.GetRegistType(typeof(IFastMessageHandle));
            foreach (Type HandlerType in HandlerTypeList)
            {
                MessageChanelAttribute Channel = Attribute.GetCustomAttribute(HandlerType, typeof(MessageChanelAttribute)) as MessageChanelAttribute;
                RedisUtil.Subscribe(Channel.ChannelName, ((FastDefaultMessageHandler)InjectUtil.Container.Resolve(HandlerType)).HandleMessage);
            }
        }

注:学习

1.这里的IOC容器是我本身实现的,地址:https://gitee.com/grassprogramming/FastIOC,你们能够用AutoFac代替ui

2.RedisUtil是对StackExchange.Redis.dll封装的处理类,地址:https://gitee.com/grassprogramming/FastUtilthis

消息发送

消息只须要调用Redis的发布方法便可,将Channel名称与定义好的数据实体类传入,序列化为Json

public void SendMessage<T>(string ChannleName, T CustomMessageEntity, Dictionary<string, string> ExtraData = null)
        {
            FastExecutor.Message.Design.Message MessageEntity = new Design.Message();
            MessageEntity.MessageChannel = ChannleName;
            MessageHead Head = new MessageHead();
            MessageBody Body = new MessageBody();
            Body.MessageMapperType = typeof(T);
            Body.MessageJsonContent = JsonConvert.SerializeObject(CustomMessageEntity);
            MessageEntity.MessageHead = Head;
            MessageEntity.MessageBody = Body;
            if (ExtraData != null)
            {
                foreach (var item in ExtraData)
                {
                    MessageEntity.AddExtra(item.Key, item.Value);
                }
            }
            RedisUtil.Publish(ChannleName, MessageEntity);
            MessageACK.CopyMessageToACKList(ChannleName, MessageEntity);
        }

消息确认与存储

Redis做订阅发布模式做为消息组件的问题有两方面

问题:消息消费完没有确认机制

解决方案

基于Redis的Hash存储方式创建一个消息存储字段,在发送消息时拷贝到消息Hash字典中,消费完毕后再删除,对应SendMessage中的MessageACK.CopyMessageToACKList方法和FastDefaultMessageHandler中的MessageACK.ConfirmMessageFinish方法,本质就是对Hash字典的增长与删除功能

问题:消息处理端挂了再次重启消息会丢失

解决方案

确认机制已经保证了消息即便没有被消费完可是处理端宕机消息也不会丢失,须要注意的是,消息没有丢失仅仅是Hash字典中有存储,可是消息通道中不存在了,因此咱们在系统每次启动时扫描这个Hash字典,从新发布消息到Channel,这样可能致使重复消费,因此须要靠FastDefaultMessageHandler中的CheckMessageIsConsume方法判断,同时消息处理者自己处理异常咱们也须要记录下来,好比发短信供应商接口有问题,消息处理异常会进入Redis的ChannelException通道,咱们能够根据需求实现一个可视化界面决定是否经过手动恢复

最后

Message组件相关代码地址:https://gitee.com/grassprogramming/FastExecutor/tree/master/code/FastExecutor/FastExecutor.Message

存在不足问题:若是消息是单纯记录日志问题,没办法确认消息是否消费了

若是你们有什么好的建议,可留言一块儿交流学习,共同进步

原文出处:https://www.cnblogs.com/yanpeng19940119/p/11603865.html

相关文章
相关标签/搜索