NServiceBus提供了8种传输管道组件,分别是Learning、MSMQ、Azure Service Bus、Azure Service Bus (Legacy)、Azure Storage Queues、SQL Server、RabbitMQ、Amazon SQS。前两篇咱们主要用的是Learnning,这篇使用RabbitMQ,也能够直接用于生产。async
RabbitMQ不在NServiceBus下,是一个单独的组件,须要单独安装。
ui
var transport = config.UseTransport<RabbitMQTransport>(); transport.ConnectionString("host=192.168.80.129;username=admin;password=admin"); transport.UseDirectRoutingTopology();
class Program { static ILog log = LogManager.GetLogger<Program>(); static void Main(string[] args) { MainAsync().GetAwaiter().GetResult(); } static async Task MainAsync() { Console.Title = "Sample.ClientUI"; var config = new EndpointConfiguration("Sample.ClientUI"); config.UseSerialization<NewtonsoftSerializer>(); config.UsePersistence<InMemoryPersistence>(); config.EnableInstallers(); var transport = config.UseTransport<RabbitMQTransport>(); transport.ConnectionString("host=192.168.80.129;username=admin;password=admin"); transport.UseDirectRoutingTopology(); var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false); await RunAsync(endpointInstance).ConfigureAwait(false); await endpointInstance.Stop().ConfigureAwait(false); } static async Task RunAsync(IEndpointInstance endpointInstance) { log.Info("Press 'P' to send an PlaceOrder Command"); while (true) { var key = Console.ReadKey(); Console.WriteLine(); switch (key.Key) { case ConsoleKey.P: { var command = new PlaceOrder { OrderId = Guid.NewGuid().ToString() }; log.Info($"Sending PlaceOrder with OrderId:{command.OrderId}"); await endpointInstance.Send("Sample.Server",command).ConfigureAwait(false); break; } case ConsoleKey.Q: return; default: log.Info("Please try again"); break; } } } }
先启动ClientUI,启动后先发送几条数据,发送数据观察程序控制台数据是否发送,而后在RabbitMQ控制台里观察Queue的状况,没建立Queue时,这里会自动建立Queue,队列名称和端点名称相同。这里发送消息数据,数据会在Sample.Server队列里。
3d
这里能看到在队列Sample.Server里有三条未消费的数据,由于我尚未启动Sample.Server控制台程序。code
public class Program { static ILog log = LogManager.GetLogger<Program>(); static void Main(string[] args) { MainAsync().GetAwaiter().GetResult(); } static async Task MainAsync() { Console.Title = "Sample.Server"; var config = new EndpointConfiguration("Sample.Server"); config.UseSerialization<NewtonsoftSerializer>(); config.UsePersistence<InMemoryPersistence>(); config.EnableInstallers(); var transport = config.UseTransport<RabbitMQTransport>(); transport.ConnectionString("host=192.168.80.129;username=admin;password=admin"); transport.UseDirectRoutingTopology(); var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false); log.Info("Press any key to quit"); Console.ReadKey(); await endpointInstance.Stop().ConfigureAwait(false); } }
public class PlaceOrderHandler : IHandleMessages<PlaceOrder> { static ILog log = LogManager.GetLogger<PlaceOrderHandler>(); public Task Handle(PlaceOrder message, IMessageHandlerContext context) { log.Info($"Received PlaceOrder with OrderId:{message.OrderId}"); return Task.CompletedTask; } }
启动Sample.Server控制台后,就会消费Queue里的数据,消费完成后观察程序控制台的提示和RabbitMQ控制台的提示。
server
这里能看到刚才堆积的三条数据已经被消费了。blog
写完这个demo我就在想在程序里经过使用NServiceBus的RabbitMQ组件和直接在程序里使用RabbitMQ直接调用的区别,到底有没有必要经过NServiceBus调用,还须要进一步考量。rabbitmq