Exceptionsession
public class SubmitOrderConsumer : IConsumer<SubmitOrder> { public Task Consume(ConsumeContext<SubmitOrder> context) { throw new Exception("Very bad things happened"); } }
UseMessageRetryapp
var sessionFactory = CreateSessionFactory(); var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => { cfg.Host("rabbitmq://localhost/"); cfg.ReceiveEndpoint("submit-order", e => { e.UseMessageRetry(r => r.Immediate(5)); e.Consumer(() => new SubmitOrderConsumer(sessionFactory)); }); });
// 当即重试:一共连续重试10次 ep.UseMessageRetry(r => r.Immediate(10)); // 间隔重试:一共重试10次,每次间隔10秒 ep.UseMessageRetry(r => r.Interval(10, TimeSpan.FromSeconds(10))); // 多个间隔重试:5秒后第一次,5+10秒后第二次,5+10+15秒后第三次 ep.UseMessageRetry(r => r.Intervals(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(15))); // 指数级间隔重试:共10次,每次间隔:当前重试次数 * 60秒 ep.UseMessageRetry(r => r.Exponential(10, TimeSpan.FromSeconds(60), TimeSpan.FromHours(24), TimeSpan.FromSeconds(60))); // 每次叠加50秒 ep.UseMessageRetry(r => r.Incremental(10, TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(50)));
e.UseMessageRetry(r => { r.Handle<ArgumentNullException>(); r.Ignore(typeof(InvalidOperationException), typeof(InvalidCastException)); r.Ignore<ArgumentException>(t => t.ParamName == "orderTotal"); });
过滤某些异常类型不进行重试async
cfg.ReceiveEndpoint("submit-order", e => { e.UseScheduledRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30))); e.UseMessageRetry(r => r.Immediate(5)); e.Consumer(() => new SubmitOrderConsumer(sessionFactory)); });
消息冲队列移除以后,在必定时间以后从新投入消息队列。须要配置调度模块(scheduling)ui
cfg.ReceiveEndpoint("submit-order", e => { e.UseScheduledRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30))); e.UseMessageRetry(r => r.Immediate(5)); e.UseInMemoryOutbox(); e.Consumer(() => new SubmitOrderConsumer(sessionFactory)); });
有些消息是在 consume 方法中发送或发布的,若是在发送以后 consume 中产生了异常,那原来发出去的消息就须要撤回,若是使用信箱以后,在 consume 中要发布/发送的消息就会先暂存在内存中直到 consume 方法成功以后才真正发出去3d
public interface Fault<T> where T : class { Guid FaultId { get; } Guid? FaultedMessageId { get; } DateTime Timestamp { get; } ExceptionInfo[] Exceptions { get; } HostInfo Host { get; } T Message { get; } }
Fault
public class DashboardFaultConsumer : IConsumer<Fault<SubmitOrder>> { public async Task Consume(ConsumeContext<Fault<SubmitOrder>> context) { // update the dashboard } }
Fault
cfg.ReceiveEndpoint("input-queue", ec => { ec.DiscardFaultedMessages(); });
默认状况下错误的消息会被投递到了 _error 队列,能够配置直接抛弃错误信息rabbitmq
cfg.ReceiveEndpoint("input-queue", ec => { ec.DiscardSkippedMessages(); });
死信队列:没有消费者的消息会被移到 _skipped 队列,但能够配置为不移到 _skipped 队列队列
本做品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。事件
欢迎转载、使用、从新发布,但务必保留文章署名 郑子铭 (包含连接: http://www.cnblogs.com/MingsonZheng/ ),不得用于商业目的,基于本文修改后的做品务必以相同的许可发布。
若有任何疑问,请与我联系 (MingsonZheng@outlook.com) 。