The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queuegit
消息dequeue时增长auto completegithub
public static async Task MessageDequeueAsync()
{
// Configure the MessageHandler Options in terms of exception handling, number of concurrent messages to deliver etc.
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
// Maximum number of Concurrent calls to the callback ProcessMessagesAsync
, set to 1 for simplicity.
// Set it according to how many messages the application wants to process in parallel.
MaxConcurrentCalls = 1,windows
// Indicates whether MessagePump should automatically complete the messages after returning from User Callback. // False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below. AutoComplete = false }; // Register the queue message handler and receive messages in a loop queueClient.RegisterMessageHandler( async (message, token) => { // Process the message await PostMessageToBot(message); loggerCore.Information($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}"); // Complete the message so that it is not received again. // This can be done only if the queueClient is opened in ReceiveMode.PeekLock mode. await queueClient.CompleteAsync(message.SystemProperties.LockToken); }, //async (exceptionEvent) => //{ // // Process the exception // loggerCore.Error($"WeChat message dequeue exception:{exceptionEvent.Exception.Message}"); //} messageHandlerOptions); }
2.在Azure上设置duplication detect时间,由1~59s,设置为55s,大笔试lock duration 时间长,可供消费的时间。app
第一次使用消息队列,遇到了一些问题:同一个消息有屡次出列。是一个消息只入列一次,仍是屡次?仍是由于出列问题,出列了屡次?async
Azure service bus queue在Azure上建立一个service bus,在service bus 上建立一个 queue,建立的时候注意 enable duplicate detected message这个选项选上,防止消息重复入列。oop
找到SAS Policy: RootManageSharedAccessKey 的值,复制下来,用做connectstring。性能
Azure service bus sample on githubspa
namespace:Microsoft.Azure.ServiceBus;
初始化queueClient:.net
private static readonly Serilog.ILogger loggerCore = LoggerCoreFactory.GetLoggerCore(); const string ServiceBusConnectionString = "Endpoint=sb://{YOUR-NAMESPACE-ON-Azure}.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={YOUR-KEYS}"; const string QueueName = "{YOUR-QUEUE-NAME}"; static IQueueClient queueClient; static MessageQueueHandler() { queueClient = new QueueClient(ServiceBusConnectionString, QueueName); }
这里添加messageID 做为入列依据,防止同一消息屡次入列,而后不用频繁的关闭消息链接,影响性能。code
public static async Task MessageEnqueueAsync(string message) { // Send messages. await SendMessagesAsync(message); // don't close frequently //await queueClient.CloseAsync(); } static async Task SendMessagesAsync(string messageXML) { try { // Create a new message to send to the queue. var messageStr = ParseMessageType.Parse(messageXML); var msgId = messageStr.Body.MsgId.Value; var message = new Microsoft.Azure.ServiceBus.Message { MessageId = msgId,// avoid same message enqueue more than once Body = Encoding.UTF8.GetBytes(messageXML), }; loggerCore.Information($"message enqueue:{messageXML}"); // Send the message to the queue. await queueClient.SendAsync(message); } catch (Exception exception) { loggerCore.Error($"message enqueue error:{exception.ToString()}"); } }
public static async Task MessageDequeueAsync() { // please choice PeekLock mode queueClient = new QueueClient(ServiceBusConnectionString, QueueName, ReceiveMode.PeekLock); // Register the queue message handler and receive messages in a loop queueClient.RegisterMessageHandler( async (message, token) => { // Process the message await PostMessageToBot(message); loggerCore.Information($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}"); // Complete the message so that it is not received again. // This can be done only if the queueClient is opened in ReceiveMode.PeekLock mode. await queueClient.CompleteAsync(message.SystemProperties.LockToken); }, async (exceptionEvent) => { // Process the exception loggerCore.Error($"WeChat message dequeue exception:{exceptionEvent.ToString()}"); }); }
这样消息就会根据入列的前后,逐次出列。
几个有帮助的连接分享一下:
1.Read best practice. 就是如何最好的使用Azure queue,避免不须要的开销。
2.Enable queue duplicate detection 启用消息重复检测项,防止同一消息屡次额enqueue。