EasyNetQ的使命是为基于RabbitMQ的消息传递提供最简单的API。 核心IBus接口有意避免公开AMQP概念,如交换,绑定和队列,而是实现基于消息类型的默认交换绑定队列拓扑。html
对于某些场景,可以配置您本身的exchange绑定队列拓扑是颇有用的;高级EasyNetQ API容许您这样作。高级API对AMQP有很好的理解。前端
高级API经过IAdvancedBus接口实现。 该接口的一个实例能够经过IBus的高级属性进行访问: var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced; 数组
1,声明交换机安全
要声明交换机,请使用IAdvancedBus的ExchangeDeclare方法:服务器
IExchange ExchangeDeclare( string name, string type, bool passive = false, bool durable = true, bool autoDelete = false, bool @internal = false, string alternateExchange = null, bool delayed = false);
name: 交换机名称
type: 有效的交换机类型(使用静态ExchangeType类的属性安全地声明交换)
passive: 不要建立交换。 若是指定的交换不存在,则抛出异常。 (默认为false)
durable: 生存服务器从新启动。 若是此参数为false,则在服务器从新启动时,交换将被删除。 (默认为true)
autoDelete: 最后一个队列未被绑定时删除此交换。 (默认为false)
internal: 这种交换不能由发布者直接使用,而只能由交换使用来交换绑定。 (默认为false)
alternateExchange:若是没法路由邮件,则将邮件路由到此交换机。
delayed:若是设置,则分配x延迟型交换以路由延迟的消息。app
①简单案例异步
// create a direct exchange var exchange = advancedBus.ExchangeDeclare("my_exchange", ExchangeType.Direct); // create a topic exchange var exchange = advancedBus.ExchangeDeclare("my_exchange", ExchangeType.Topic); // create a fanout exchange var exchange = advancedBus.ExchangeDeclare("my_exchange", ExchangeType.Fanout);
要得到RabbitMQ默认交换,请执行如下操做:函数
var exchange = Exchange.GetDefault();
2,声明队列oop
要声明队列,请使用IAdvancedBus的QueueDeclare方法:性能
IQueue QueueDeclare( string name, bool passive = false, bool durable = true, bool exclusive = false, bool autoDelete = false, int? perQueueMessageTtl = null, int? expires = null, byte? maxPriority = null, string deadLetterExchange = null, string deadLetterRoutingKey = null, int? maxLength = null, int? maxLengthBytes = null);
name: 队列的名称
passive:若是队列不存在,则不要建立该队列,而是引起异常(默认为false)
durable: 能够在服务器从新启动后继续运行 若是这是错误的,则在服务器从新启动时,队列将被删除。 (默认为true)
exclusive: 只能由当前链接访问,其余链接上来会抛异常。 (默认为false)
autoDelete: 全部消费者断开链接后删除队列。 (默认为false)
perQueueMessageTtl:丢弃以前,消息在队列中应保留多长时间(以毫秒为单位)。 (默认未设置)
expires: 自动删除以前,队列应该保持未使用状态的时间以毫秒为单位。 (默认未设置)
maxPriority: 肯定队列应支持的最大消息优先级。
deadLetterExchange:肯定交换机的名称在被服务器自动删除以前能够保持未使用状态。
deadLetterRoutingKey:若是设置,将路由消息与指定的路由密钥,若是未设置,则消息将使用与最初发布的路由密钥相同的路由。
maxLength: 队列中可能存在的最大可用消息数。 一旦达到限制,邮件就会从队列的前面被删除或死信,以便为新邮件腾出空间。
maxLengthBytes:队列的最大大小(以字节为单位)。 一旦达到限制,邮件就会从队列的前面被删除或死信,以便为新邮件腾出空间
请注意,若是定义了maxLength和/或maxLengthBytes属性,则RabbitMQ的行为可能并不如人们所指望的那样。 人们可能会指望代理拒绝进一步的消息; 可是RabbitMQ文档(https://www.rabbitmq.com/maxlength.html)代表,一旦达到限制,邮件将从队列的前端丢弃或死锁,以便为新邮件腾出空间。
①简单案例
// declare a durable queue var queue = advancedBus.QueueDeclare("my_queue"); // declare a queue with message TTL of 10 seconds: var queue = advancedBus.QueueDeclare("my_queue", perQueueTtl:10000);
要声明一个'未命名的'独占队列,其中RabbitMQ提供队列名称,请使用不带参数的QueueDeclare重载:
var queue = advancedBus.QueueDeclare();
请注意,EasyNetQ的自动消费者从新链接逻辑被关闭以用于独占队列。
3,绑定
你将一个队列绑定到像这样的交换机上:
var queue = advancedBus.QueueDeclare("my.queue"); var exchange = advancedBus.ExchangeDeclare("my.exchange", ExchangeType.Topic); var binding = advancedBus.Bind(exchange, queue, "A.*");
要指定队列和交换机之间的多个绑定,只需执行多个绑定调用便可:
var queue = advancedBus.QueueDeclare("my.queue"); var exchange = advancedBus.ExchangeDeclare("my.exchange", ExchangeType.Topic); advancedBus.Bind(exchange, queue, "A.B"); advancedBus.Bind(exchange, queue, "A.C");
你也能够将交换机绑定在一个链上:
var sourceExchange = advancedBus.ExchangeDeclare("my.exchange.1", ExchangeType.Topic); var destinationExchange = advancedBus.ExchangeDeclare("my.exchange.2", ExchangeType.Topic); var queue = advancedBus.QueueDeclare("my.queue"); advancedBus.Bind(sourceExchange, destinationExchange, "A.*"); advancedBus.Bind(destinationExchange, queue, "A.C");
4,发布
先进的Publish方法容许您指定要发布消息的交换机。 它还容许访问消息的AMQP基本属性。
建立你的消息。 高级API要求您的消息包装在消息中:
var myMessage = new MyMessage {Text = "Hello from the publisher"}; var message = new Message<MyMessage>(myMessage);
Message类可以让您访问AMQP基本属性,例如:
message.Properties.AppId = "my_app_id"; message.Properties.ReplyTo = "my_reply_queue";
最后使用发布方法发布您的消息。 在这里,咱们正在向默认交流发布:
bus.Publish(Exchange.GetDefault(), queueName, false, false, message);
发布的重载容许您绕过EasyNetQ的消息序列化并建立本身的字节数组消息:
var properties = new MessageProperties(); var body = Encoding.UTF8.GetBytes("Hello World!"); bus.Publish(Exchange.GetDefault(), queueName, false, false, properties, body);
5,订阅
使用IAdvancedBus的Consume方法来消费队列中的消息。
IDisposable Consume<T>(IQueue queue, Func<IMessage<T>, MessageReceivedInfo, Task> onMessage) where T : class;
onMessage委托是您为消息传递提供的处理程序。 其参数以下:
如上面发布部分所述,IMessage使您能够访问消息及其MessageProperties。 MessageReceivedInfo为您提供有关消息消耗的上下文的额外信息:
public class MessageReceivedInfo { public string ConsumerTag { get; set; } public ulong DeliverTag { get; set; } public bool Redelivered { get; set; } public string Exchange { get; set; } public string RoutingKey { get; set; } }
您返回一个容许您编写非阻塞异步处理程序的任务。
消耗方法返回一个IDisposable。 调用其Dispose方法来取消使用者。
若是您只须要同步处理程序,则可使用同步重载:
IDisposable Consume<T>(IQueue queue, Action<IMessage<T>, MessageReceivedInfo> onMessage) where T : class;
要绕过EasyNetQ的消息序列化器,请使用提供原始字节数组消息的消耗超载:
void Consume(IQueue queue, Func<Byte[], MessageProperties, MessageReceivedInfo, Task> onMessage);
在这个例子中,咱们使用队列'my_queue'中的原始消息字节:
var queue = advancedBus.QueueDeclare("my_queue"); advancedBus.Consume(queue, (body, properties, info) => Task.Factory.StartNew(() => { var message = Encoding.UTF8.GetString(body); Console.WriteLine("Got message: '{0}'", message); }));
您能够选择使用Consume方法的这种重载向单个使用者注册多个处理程序:
IDisposable Consume(IQueue queue, Action<IHandlerRegistration> addHandlers);
IHandlerRegistration接口以下所示:
public interface IHandlerRegistration { /// <summary> /// 添加异步处理程序 /// </summary> /// <typeparam name="T">The message type</typeparam> /// <param name="handler">The handler</param> /// <returns></returns> IHandlerRegistration Add<T>(Func<IMessage<T>, MessageReceivedInfo, Task> handler) where T : class; /// <summary> /// 添加同步处理程序 /// </summary> /// <typeparam name="T">消息类型</typeparam> /// <param name="handler">The handler</param> /// <returns></returns> IHandlerRegistration Add<T>(Action<IMessage<T>, MessageReceivedInfo> handler) where T : class; /// <summary> ///若是处理程序集合在未找到匹配的处理程序时应抛出EasyNetQException,则设置为true;若是应返回noop处理程序,则设置为false .Default为true。 /// </summary> bool ThrowOnNoMatchingHandler { get; set; } }
在这个例子中,咱们注册了两个不一样的处理程序,一个处理MyMessage类型的消息,另外一个处理MyOtherMessage类型的消息:
bus.Advanced.Consume(queue, x => x .Add<MyMessage>((message, info) => { Console.WriteLine("Got MyMessage {0}", message.Body.Text); countdownEvent.Signal(); }) .Add<MyOtherMessage>((message, info) => { Console.WriteLine("Got MyOtherMessage {0}", message.Body.Text); countdownEvent.Signal(); }) );
查看这篇博文了解更多信息:http://mikehadlow.blogspot.co.uk/2013/11/easynetq-multiple-handlers-per-consumer.html
6,从队列中获取单个消息
要从队列中获取单条消息,请使用IAdvancedBus.Get方法:
IBasicGetResult<T> Get<T>(IQueue queue) where T : class;
从AMQP文档:“此方法使用同步对话提供对队列中消息的直接访问,该同步对话旨在用于同步功能比性能更重要的特定类型的应用程序。” 不要使用Get来轮询消息。 在典型的应用场景中,您应该始终支持消费。
IBasicGetResult具备如下签名:
/// <summary> ///AdvancedBus Get方法的结果 /// </summary> /// <typeparam name="T"></typeparam> public interface IBasicGetResult<T> where T : class { /// <summary> ///若是消息可用,则为true,不然为false。 /// </summary> bool MessageAvailable { get; } /// <summary> /// 消息从队列中回收。 若是没有消息可用,此属性将引起MessageNotAvailableException。 在尝试访问它以前,您应该检查MessageAvailable属性。 /// </summary> IMessage<T> Message { get; } }
在访问Message属性前老是检查MessageAvailable方法。
一个例子:
var queue = advancedBus.QueueDeclare("get_test"); advancedBus.Publish(Exchange.GetDefault(), "get_test", false, false, new Message<MyMessage>(new MyMessage{ Text = "Oh! Hello!" })); var getResult = advancedBus.Get<MyMessage>(queue); if (getResult.MessageAvailable) { Console.Out.WriteLine("Got message: {0}", getResult.Message.Body.Text); } else { Console.Out.WriteLine("Failed to get message!"); }
要访问原始二进制消息,请使用非通用Get方法:
IBasicGetResult Get(IQueue queue);
非泛型IBasicGetResult具备如下定义:
public interface IBasicGetResult { byte[] Body { get; } MessageProperties Properties { get; } MessageReceivedInfo Info { get; } }
7,消息类型必须匹配
EasyNetQ高级API指望订户仅接收通用类型参数提供的类型的消息。 在上面的例子中,只有MyMessage类型的消息应该被接收。 可是,EasyNetQ不保护您不向用户发布错误类型的消息。 我能够很容易地设置一个交换绑定队列拓扑来发布NotMyMessage类型的消息,该消息将被上面的处理程序接收。 若是接收到错误类型的消息,EasyNetQ将抛出EasyNetQInvalidMessageTypeException异常:
EasyNetQ.EasyNetQInvalidMessageTypeException: Message type is incorrect. Expected 'EasyNetQ_Tests_MyMessage:EasyNetQ_Tests', but was 'EasyNetQ_Tests_MyOtherMessage:EasyNetQ_Tests' at EasyNetQ.RabbitAdvancedBus.CheckMessageType[TMessage](MessageProperties properties) in D:\Source\EasyNetQ\Source\EasyNetQ\RabbitAdvancedBus.cs:line 217 at EasyNetQ.RabbitAdvancedBus.<>c__DisplayClass1`1.<Subscribe>b__0(Byte[] body, MessageProperties properties, MessageReceivedInfo messageRecievedInfo) in D:\Source\EasyNetQ\Source\EasyNetQ\RabbitAdvancedBus.cs:line 131 at EasyNetQ.RabbitAdvancedBus.<>c__DisplayClass6.<Subscribe>b__5(String consumerTag, UInt64 deliveryTag, Boolean redelivered, String exchange, String routingKey, IBasicProperties properties, Byte[] body) in D:\Source\EasyNetQ\Source\EasyNetQ\RabbitAdvancedBus.cs:line 176 at EasyNetQ.QueueingConsumerFactory.HandleMessageDelivery(BasicDeliverEventArgs basicDeliverEventArgs) in D:\Source\EasyNetQ\Source\EasyNetQ\QueueingConsumerFactory.cs:line 85
8,事件
当经过RabbitHutch实例化一个IBus时,您能够指定一个AdvancedBusEventHandlers。 该类包含IAdvancedBus中存在的每一个事件的事件处理程序属性,并提供了在总线实例化以前指定事件处理程序的方法。
没必要使用它,由于一旦建立了总线,它仍然能够添加事件处理程序。 可是,若是您但愿可以捕获RabbitAdvancedBus的第一个Connected事件,则必须将AdvancedBusEventHandlers与Connected EventHandler一块儿使用。 这是由于总线将在构造函数返回以前尝试链接一次,若是链接尝试成功,将会引起RabbitAdvancedBus.OnConnected。
var buss = RabbitHutch.CreateBus("host=localhost", new AdvancedBusEventHandlers(connected: (s, e) => { var advancedBus = (IAdvancedBus)s; Console.WriteLine(advancedBus.IsConnected); // This will print true. }));