首先甩官网:http://www.rabbitmq.com/html
而后是.NET Client连接:http://www.rabbitmq.com/dotnet.htmlgit
GitHub仓库:https://github.com/rabbitmq/rabbitmq-dotnet-clientgithub
下面直接进入正文,一共是两个主题:消费者怎么写?生产者怎么写?web
在dotnet core mvc中,消费者确定不能经过API或者其余的东西启动,理应是跟着程序一块儿启动的.api
因此...mvc
在dotnet core 2.0以上版本,咱们直接用 IHostedService 接口实现.app
直接上代码.webapp
// RabbitListener.cs 这个是基类,只实现注册RabbitMQ后到监听消息,而后每一个消费者本身去重写RouteKey/QueueName/消息处理函数Process using System; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace Test.Listener { public class RabbitListener : IHostedService { private readonly IConnection connection; private readonly IModel channel; public RabbitListener(IOptions<AppConfiguration> options) { try { var factory = new ConnectionFactory() { // 这是我这边的配置,本身改为本身用就好 HostName = options.Value.RabbitHost, UserName = options.Value.RabbitUserName, Password = options.Value.RabbitPassword, Port = options.Value.RabbitPort, }; this.connection = factory.CreateConnection(); this.channel = connection.CreateModel(); } catch (Exception ex) { Console.WriteLine($"RabbitListener init error,ex:{ex.Message}"); } } public Task StartAsync(CancellationToken cancellationToken) { Register(); return Task.CompletedTask; } protected string RouteKey; protected string QueueName; // 处理消息的方法 public virtual bool Process(string message) { throw new NotImplementedException(); } // 注册消费者监听在这里 public void Register() { Console.WriteLine($"RabbitListener register,routeKey:{RouteKey}"); channel.ExchangeDeclare(exchange: "message", type: "topic"); channel.QueueDeclare(queue:QueueName, exclusive: false); channel.QueueBind(queue: QueueName, exchange: "message", routingKey: RouteKey); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var result = Process(message); if (result) { channel.BasicAck(ea.DeliveryTag, false); } }; channel.BasicConsume(queue: QueueName, consumer: consumer); } public void DeRegister() { this.connection.Close(); } public Task StopAsync(CancellationToken cancellationToken) { this.connection.Close(); return Task.CompletedTask; } } } // 随便贴一个子类 using System; using System.Text; using Microsoft.Extensions.Options; using Newtonsoft.Json.Linq; using RabbitMQ.Client; using RabbitMQ.Client.Events; using Microsoft.Extensions.DependencyInjection; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging; namespace Test.Listener { public class ChapterLister : RabbitListener { private readonly ILogger<RabbitListener> _logger; // 由于Process函数是委托回调,直接将其余Service注入的话二者不在一个scope, // 这里要调用其余的Service实例只能用IServiceProvider CreateScope后获取实例对象 private readonly IServiceProvider _services; public ChapterLister(IServiceProvider services, IOptions<AppConfiguration> options, ILogger<RabbitListener> logger) : base(options) { base.RouteKey = "done.task"; base.QueueName = "lemonnovelapi.chapter"; _logger = logger; _services = services; } public override bool Process(string message) { var taskMessage = JToken.Parse(message); if (taskMessage == null) { // 返回false 的时候回直接驳回此消息,表示处理不了 return false; } try { using (var scope = _services.CreateScope()) { var xxxService = scope.ServiceProvider.GetRequiredService<XXXXService>(); return true; } } catch (Exception ex) { _logger.LogInformation($"Process fail,error:{ex.Message},stackTrace:{ex.StackTrace},message:{message}"); _logger.LogError(-1, ex, "Process fail"); return false; } } } }
而后,记住....ide
注入到Startup.cs的时候,使用AddHostedService函数
services.AddHostedService<ChapterLister>();
消费者就这样玩了.
这个其实更简单.
using System; using System.Net; using Newtonsoft.Json.Linq; using RestSharp; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using RabbitMQ.Client; using Newtonsoft.Json; using System.Text; namespace Test.SDK { public class RabbitMQClient { private readonly IModel _channel; private readonly ILogger _logger; public RabbitMQClient(IOptions<AppConfiguration> options, ILogger<RabbitMQClient> logger) { try { var factory = new ConnectionFactory() { HostName = options.Value.RabbitHost, UserName = options.Value.RabbitUserName, Password = options.Value.RabbitPassword, Port = options.Value.RabbitPort, }; var connection = factory.CreateConnection(); _channel = connection.CreateModel(); } catch (Exception ex) { logger.LogError(-1, ex, "RabbitMQClient init fail"); } _logger = logger; } public virtual void PushMessage(string routingKey, object message) { _logger.LogInformation($"PushMessage,routingKey:{routingKey}"); _channel.QueueDeclare(queue: "message", durable: false, exclusive: false, autoDelete: false, arguments: null); string msgJson = JsonConvert.SerializeObject(message); var body = Encoding.UTF8.GetBytes(msgJson); _channel.BasicPublish(exchange: "message", routingKey: routingKey, basicProperties: null, body: body); } } }
切记注入实例的时候用单例模式.
services.AddSingleton<RabbitMQClient, RabbitMQClient>();
全文完...