var connStr = "localhost:6379,password="; var db = 2; SiteRedisHelper redisHelper = new SiteRedisHelper(connStr, "monster", db); var key = "MessageQueue"; var msg = string.Empty; #endregion
消息写入+读取(入门版)html
#region 添加消息 while (true) { Console.WriteLine("请输入你须要发送的消息"); msg = Console.ReadLine(); if (!string.IsNullOrWhiteSpace(msg)) { // var listLeftPush = redisHelper.ListLeftPush(key, msg);//添加一条消息并返回已添加消息数量 var listLeftPushAsync = redisHelper.ListLeftPushAsync(key, msg);//异步添加 //追加事件 listLeftPushAsync.ContinueWith((task => { if (task.IsCompletedSuccessfully) { Console.WriteLine($"消息添加完毕,此消息队列共有{task.Result}条信息"); } })); } else { Console.WriteLine("中止发送消息"); break; } }; #endregion #region 读取消息 while (!string.IsNullOrWhiteSpace(msg = redisHelper.ListLeftPop(key))) { Console.WriteLine("消息出列:" + msg); Debug.WriteLine("消息出列:" + msg); FileLogTools.Write(msg, "RedisMSMQ.Try"); } #endregion
相对来讲仍是挺简单的,也没有赶上什么奇怪的异常,此处便不作什么太多说明redis
将实体作为消息进行写入/读取json
稍微改造了一下使用对象作为消息进行写入/读取 <实体类> public class MsgEntity { public string Content { get; set; } public DateTimeOffset CreateTime { get; set; } } <添加相关> var msgCount = redisHelper.ListLeftPush<MsgEntity>(key,new MsgEntity() { Content = msg, CreateTime = DateTimeOffset.Now }); Console.WriteLine($"添加成功,消息站已有{msgCount}条消息"); <读取的消息> 1.原始: ���� DRedisMSMQ.Try, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null RedisMSMQ.Try.Entity.MsgEntity <Content>k__BackingField<CreateTime>k__BackingFieldSystem.DateTimeOffset hello����System.DateTimeOffset DateTime OffsetMinutes 'NCN��� ... 一串乱码 + 一堆命名空间 看来写入须要调整 2.调整 : <old> private static byte[] Serialize(object obj) { try { if (obj == null) return null; var binaryFormatter = new BinaryFormatter(); using (var memoryStream = new MemoryStream()) { binaryFormatter.Serialize(memoryStream, obj); var data = memoryStream.ToArray(); return data; } } catch (SerializationException ex) { throw ex; } } <new> JsonConvert.SerializeObject(redisValue) 2.1 读取一样处理 <异常记录> 1.添加时异常:System.Runtime.Serialization.SerializationException:“Type 'RedisMSMQ.Try.Entity.MsgEntity' in Assembly 'RedisMSMQ.Try, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null' is not marked as serializable.” 说明:此对象在程序集中不可进行序列化 处理:给类添加特性[Serializable]
使用实体时,处理也是很是简单的,主要是注意一下转string的方式,
我的使用的是JsonConvert 进行序列化/反序列化(json比较简洁,工具类功能也比较齐全) 其次就是编码一致api
模拟简单的发布/订阅异步
#region 准备参数 var connStr = "localhost:6379,password="; var db = 2; SiteRedisHelper redisHelper = new SiteRedisHelper(connStr, "monster", db); var key = "entrepot"; var model = default(Produce); #endregion #region 审核线程,订阅申请消息,给予相应的处理 ThreadPool.QueueUserWorkItem((state => { Thread.Sleep(1000); while (IsDealValid) { var validMsg = redisHelper.ListRightPop<Produce>(key); if (validMsg != null && !validMsg.IsNull()) { Console.WriteLine($"正在审核产品:{JsonConvert.SerializeObject(validMsg)}"); } } })); #endregion #region 主线程_添加产品 Console.WriteLine("欢迎来到产品中心,请填写产品注册资料"); IsDealValid = true; while ((model = Produce.RegisterProduce())!= null) { var validCount = redisHelper.ListLeftPush<Produce>(key, model);//将注册资料添加到消息队列中 Console.WriteLine($"产品注册申请正在处理中……,在您以前共有{validCount-1}个产品正在处理,请耐心等待审核结果"); } #endregion
发布/订阅工具
#region 订阅消息 ThreadPool.QueueUserWorkItem((state => { redisHelper.Subscribe(channel, ((redisChannel, value) => { //Console.WriteLine($"订阅方收到一条消息:{JsonConvert.SerializeObject(value)}"); if (!value.IsNullOrEmpty) { Console.WriteLine($"订阅方收到一条消息:{value.ToString()}"); } })); Console.WriteLine("子线程已订阅消息"); })); #endregion #region 主线程发布消息 while ((model = Produce.RegisterProduce()) != null) { var receiveCount = redisHelper.Publish(channel, model); Console.WriteLine($"此条消息已被{receiveCount}我的订阅"); } #endregion 发布订阅 vs 消息队列 1. 消息队列中的消息不能重复读取,发布订阅中的消息由订阅方共享 2. 若发布时没有订阅方,后续加入的订阅方将不能收到此条消息。在消息队列中,若消息没有及时出列,消息将会继续保存在消息队列中
总结编码
整体来讲,redis的操做都是比较简单的,由于官方已经有集成api供咱们调用,因此操做起来仍是没什么难度,只须要了解方法的应用就能够了,复杂一点的,应该就是业务流程的一些具体应用,应用场景的使用,效率的提高线程
相关类说明:code
SiteRedisHelperorm
参考博文:http://www.cnblogs.com/liqingwen/archive/2017/04/06/6672452.html
《构造方法》 public SiteRedisHelper(string connStr, string defaultKey, int db = -1) { //链接字符串 ConnectionString = connStr; //创建链接 _connMultiplexer = ConnectionMultiplexer.Connect(ConnectionString); //默认前缀【无实用】 DefaultKey = defaultKey; //注册相关事件 【未应用】 RegisterEvent(); //获取Database操做对象 _db = _connMultiplexer.GetDatabase(db); }
author:monster
since:7/9/2018 11:25:14 AM
direction:redis mssq analysis