github:https://github.com/yunqian44/Azure.Storage.git
做者:Allen
版权:转载请在文章明显位置注明做者及出处。如发现错误,欢迎批评指正。
在以前介绍到 Azure Storage 第一篇文章中就有介绍到 Azure Storage 是 Azure 上提供的一项存储服务,Azure 存储包括 对象、文件、磁盘、队列和表存储。这里的提到的队列(Queue)就是今天要分享的内容。html
惯例,先来一些微软的官方解释git
1,什么是 Azure Queue Storage?github
答:Azure 队列存储是一项实现基于云的队列的 Azure 服务。 每一个队列都保留一个消息列表。 应用程序组件使用 REST API 或 Azure 提供的客户端库访问队列。 一般状况下,将有一个或多个“发送方”组件以及一个或多个“接收方”组件。 发送方组件将消息添加到队列。 接收方组件检索队列前面的消息以进行处理。 下图显示多个将消息添加到 Azure 队列的发送者应用程序以及一个检索消息的收件方应用程序。windows
队列中的消息是最大为 64 KB 的字节数组。 任何 Azure 组件都不会解释消息内容。若是要建立结构化消息,可使用 XML 或 JSON 格式化消息内容。 代码负责生成并解释自定义格式。数组
--------------------我是分割线--------------------bash
Azure Blob Storage 存储系列:async
2,Azure Storage 系列(二) .NET Core Web 项目中操做 Blob 存储post
3,Azure Storage 系列(三)Blob 参数设置说明测试
4,Azure Storage 系列(四)在.Net 上使用Table Storage
5,Azure Storage 系列(五)经过Azure.Cosmos.Table 类库在.Net 上使用 Table Storage
6,Azure Storage 系列(六)使用Azure Queue Storage
选择 cnbateblogaccount 左侧菜单的 “Queue service=》Queues” ,点击 “+ Queue”
Queue name:“blogmessage”
点击 “OK”
2.1,安装 “Azure.Storage.Queues” 的Nuget
使用程序包管理控制台进行安装
Install-Package Azure.Storage.Queues -Version 12.4.2
2.2,建立IQueueService 接口,和 QueueService 实现类,Queue控制器方法等
1 public interface IQueueService 2 { 3 /// <summary> 4 /// 插入Message 5 /// </summary> 6 /// <param name="msg">msg</param> 7 /// <returns></returns> 8 Task AddMessage(string msg); 9 10 /// <summary> 11 /// 获取消息 12 /// </summary> 13 /// <returns></returns> 14 IAsyncEnumerable<string> GetMessages(); 15 16 /// <summary> 17 /// 更新消息 18 /// </summary> 19 /// <returns></returns> 20 Task UpdateMessage(); 21 22 /// <summary> 23 /// 处理消息 24 /// </summary> 25 /// <returns></returns> 26 Task ProcessingMessage(); 27 28 29 }
1 public class QueueService : IQueueService 2 { 3 private readonly QueueClient _queueClient; 4 5 public QueueService(QueueClient queueClient) 6 { 7 _queueClient = queueClient; 8 } 9 10 11 12 /// <summary> 13 /// 添加消息 14 /// </summary> 15 /// <param name="msg">消息</param> 16 /// <returns></returns> 17 public async Task AddMessage(string msg) 18 { 19 // Create the queue 20 _queueClient.CreateIfNotExists(); 21 22 if (_queueClient.Exists()) 23 { 24 25 // Send a message to the queue 26 await _queueClient.SendMessageAsync(msg.EncryptBase64()); 27 } 28 } 29 30 public async IAsyncEnumerable<string> GetMessages() 31 { 32 if (_queueClient.Exists()) 33 { 34 // Peek at the next message 35 PeekedMessage[] peekedMessage = await _queueClient.PeekMessagesAsync(); 36 for (int i = 0; i < peekedMessage.Length; i++) 37 { 38 //Display the message 39 yield return string.Format($"Peeked message: '{peekedMessage[i].MessageText.DecodeBase64()}'") ; 40 } 41 } 42 } 43 44 /// <summary> 45 /// 处理消息 46 /// </summary> 47 /// <returns></returns> 48 public async Task ProcessingMessage() 49 { 50 // 执行 getmessage(), 队头的消息会变得不可见。 51 QueueMessage[] retrievedMessage = await _queueClient.ReceiveMessagesAsync(); 52 try 53 { 54 //处理消息 55 56 57 // 若是在30s内你没有删除这条消息,它会从新出如今队尾。 58 // 因此正确处理一条消息的过程是,处理完成后,删除这条消息 59 await _queueClient.DeleteMessageAsync(retrievedMessage[0].MessageId, retrievedMessage[0].PopReceipt); 60 } 61 catch //(消息处理异常) 62 { } 63 } 64 65 /// <summary> 66 /// 更新已排队的消息 67 /// </summary> 68 /// <returns></returns> 69 public async Task UpdateMessage() 70 { 71 if (_queueClient.Exists()) 72 { 73 // Get the message from the queue 74 QueueMessage[] message = await _queueClient.ReceiveMessagesAsync(); 75 76 // Update the message contents 77 await _queueClient.UpdateMessageAsync(message[0].MessageId, 78 message[0].PopReceipt, 79 "Updated contents".EncryptBase64(), 80 TimeSpan.FromSeconds(60.0) // Make it invisible for another 60 seconds 81 ); 82 } 83 } 84 }
1 [Route("Queue")] 2 public class QueueExplorerController : Controller 3 { 4 5 private readonly IQueueService _queueService; 6 7 public QueueExplorerController(IQueueService queueSerivce) 8 { 9 this._queueService = queueSerivce; 10 } 11 12 [HttpPost("AddQueue")] 13 public async Task<ActionResult> AddQueue() 14 { 15 string msg = $"我是添加进去的第一个消息"; 16 await _queueService.AddMessage(msg); 17 return Ok(); 18 } 19 20 [HttpGet("QueryQueue")] 21 public ActionResult QueryQueue() 22 { 23 return Ok( _queueService.GetMessages()); 24 25 } 26 27 [HttpPut("UpdateQueue")] 28 public async Task<ActionResult> UpdateQueue() 29 { 30 await _queueService.UpdateMessage(); 31 return Ok(); 32 } 33 34 [HttpGet("ProcessingMessage")] 35 public async Task<ActionResult> ProcessingQueue() 36 { 37 await _queueService.ProcessingMessage(); 38 return Ok(); 39 } 40 }
重点:将新消息添加到队列的后面。可见性超时指定消息应该对Dequeue和Peek操做不可见的时间。消息内容必须是UTF-8编码的字符串,最大长度为64KB。
消息的格式必须能够包含在具备UTF-8编码。要在消息中包含标记,消息的内容必须为XML换码或Base64编码。在将消息添加到队列以前,将删除消息中全部未转义或编码的XML标记。
咱们这里使用Base64编码
public static class StringExtensions { public static string EncryptBase64(this string s) { byte[] b = Encoding.UTF8.GetBytes(s); return Convert.ToBase64String(b); } public static string DecodeBase64(this string s) { byte[] b = Convert.FromBase64String(s); return Encoding.UTF8.GetString(b); } }
2.3,添加对 QueueService,以及QueueClient 的以来注入
services.AddSingleton(x => new QueueClient("DefaultEndpointsProtocol=https;AccountName=cnbateblogaccount;AccountKey=e2T2gYREFdxkYIJocvC4Wut7khxMWJCbQBp8tPM2EJt37QaUUlflTPAlkoJzIlY29aGYt8WW0xx1bckO4hLKJA==;EndpointSuffix=core.windows.net", "blogmessage")); services.AddSingleton<IQueueService, QueueService>();
3.1,添加队列消息
咱们添加一条 “我是添加进去的第一个消息” 的Queue
postman 中输入 “localhost:9001/Queue/AddQueue”,点击 “Send”
接下来,咱们能够在 VS 的 “Cloud Explorer” 查看到对应的 “cnbateblogaccount” 的 Strorage Account,以及 “blogmessage” 的 Storage Queue
右键弹出选择页面,点击 “打开”
咱们能够看懂添加进去的 Queue 的信息 ,Queue 的过去时间由于咱们进行设置,这里默认是7天
3.2 查询Queue
postman 中输入 “localhost:9001/Queue/QueryQueue”,点击 “Send”,能够看到刚刚添加进去的Queue被查询出来了
3.3,更新Queue
postman 中输入 “localhost:9001/Queue/UpdateQueue”,点击 “Send”
注意:因为咱们在更新 Queue 的时候,设置了 Queue 的不可见时间为60秒,因此在更新操做完成后去查看 Queue 会找不到更新的那条Queue 的信息,稍等一下再去查看 就能够展现出更新的 Queue 的信息
更新的Queue的文本内容已经发生改变了
3.4 处理Queue
postman中输入 “localhost:9001/Queue/ProcessingMessage”,点击 “Send”
注意:由于这里只是作演示,因此就假象进行消息处理,处理完成后,删除这条消息。
能够看到已经没有了 Queue 的信息了
Ok,今天的分享就先到此结束