说到队列的话,你们必定不会陌生,可是扯到优先级队列的话,仍是有一部分同窗是不清楚的,多是不知道怎么去实现吧,其实呢,,,这东西已redis
经烂大街了。。。很简单,用“堆”去实现的,在咱们系统中有一个订单催付的场景,咱们客户的客户在tmall,taobao下的订单,taobao会及时将订单推送给后端
咱们,若是在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,可是,tmall商家对咱们来讲,确定是要分大客户和小客测试
户的对吧,好比像施华蔻,百雀林这样大商家一年起码可以给咱们贡献几百万,因此理应固然,他们的订单必须获得优先处理,而曾今咱们的后端系统是使优化
用redis来存放的定时轮询,你们都知道redis只能用List作一个简简单单的消息队列,并不能实现一个优先级的场景,因此订单量大了后采用rabbitmq进行ui
改造和优化,若是发现是大客户的订单给一个相对比较高的优先级,不然就是默认优先级,好了,废话很少说,咱们来看看如何去设置。spa
既然是优先级队列,那么必然要在Queue上开一个口子贴上一个优先级的标签,为了看怎么设置,咱们用一下rabbitmq的监控UI,看看这个里面是如何code
手工的建立优先级队列。orm
从这个图中能够看到在Arguments栏中有特别多的小属性,其中有一项就是"Maximum priority",这项的意思就是说能够定义优先级的最大值,其实blog
想一想也是,不可能咱们定义的优先级是一个很是大的数字,好比int.MaxValue,大多状况下都是10之内的数字就能够了,再或者咱们曾今接触过的 MSMQ,rabbitmq
它的优先级只是一些枚举值,什么High,Normal,Low,不知道你们能否记得? 下面来看下代码中该如何实现呢???
1. 在Queue上附加优先级属性
Dictionary<string, object> dic = new Dictionary<string, object>(); dic.Add("x-max-priority", 20); channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: dic);
上面的代码作了一个简单的队列声明,queuename="hello",持久化,排外。。。而后把"x-max-priority"塞入到字典中做为arguments参数,看起来还
是很是简单吧~~~
2. 在Message上指定优先级属性
var properties = channel.CreateBasicProperties(); properties.Priority = 1; channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
经过上面的代码能够看到,在Message上设置优先级,我是经过在channel通道上设置Priority属性,以后塞到basicProperties中就能够了,好了,有上面这两
个基础以后,下面就能够开始测试了,准备向rabbitmq推送10条记录,其中第5条的优先级最高,因此应该首先就print出来,以下图:
static void Main(string[] args) { var sb = new StringBuilder(); for (int i = 0; i < 11; i++) { sb.Append(i); } var factory = new ConnectionFactory() { HostName = "192.168.23.136", UserName = "datamip", Password = "datamip" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "mydirect", type: ExchangeType.Direct, durable: true); Dictionary<string, object> dic = new Dictionary<string, object>(); dic.Add("x-max-priority", 20); for (int i = 0; i < 10; i++) { channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: dic); string message = string.Format("{0} {1}", i, sb.ToString()); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Priority = (i == 5) ? (byte)10 : (byte)i; channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body); Console.WriteLine(" [x] Sent {0}", i); } } } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); }
图中能够看到10条消息我都送到rabbitmq中去了,接下来打开consume端,来看看所谓的index=5 是否第一个送出来??
static void Main(string[] args) { for (int m = 0; m < int.MaxValue; m++) { var factory = new ConnectionFactory() { HostName = "192.168.23.136", UserName = "datamip", Password = "datamip" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { var result = channel.BasicGet("hello", true); if (result != null) { var str = Encoding.UTF8.GetString(result.Body); Console.WriteLine("{0} 消息内容 {1}", m, str); System.Threading.Thread.Sleep(1); } } } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); }
一切都是这么的完美,接下来为了进行可视化验证,你能够在WebUI中观察观察,能够发如今Queue上面多了一个 Pri 标记,有意思吧。
好了,这么重要的功能,是否是已经让你足够兴奋啦, 但愿你们可以好好的在实际场景中运用吧~~~