欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RabbitMQ -- Masstransit 异常处理

程序员文章站 2022-07-12 13:15:49
...

Exception

public class SubmitOrderConsumer :
IConsumer
{
public Task Consume(ConsumeContext context)
{
throw new Exception(“Very bad things happened”);
}
}
UseMessageRetry

var sessionFactory = CreateSessionFactory();

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(“rabbitmq://localhost/”);

cfg.ReceiveEndpoint("submit-order", e =>
{
    e.UseMessageRetry(r => r.Immediate(5));

    e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
});

});
重试配置

// 立即重试:一共连续重试10次
ep.UseMessageRetry(r => r.Immediate(10));

// 间隔重试:一共重试10次,每次间隔10秒
ep.UseMessageRetry(r => r.Interval(10, TimeSpan.FromSeconds(10)));

// 多个间隔重试:5秒后第一次,5+10秒后第二次,5+10+15秒后第三次
ep.UseMessageRetry(r => r.Intervals(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(15)));

// 指数级间隔重试:共10次,每次间隔:当前重试次数 * 60秒
ep.UseMessageRetry(r => r.Exponential(10, TimeSpan.FromSeconds(60), TimeSpan.FromHours(24), TimeSpan.FromSeconds(60)));

// 每次叠加50秒
ep.UseMessageRetry(r => r.Incremental(10, TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(50)));
重试条件
e.UseMessageRetry(r =>
{
r.Handle();
r.Ignore(typeof(InvalidOperationException), typeof(InvalidCastException));
r.Ignore(t => t.ParamName == “orderTotal”);
});
过滤某些异常类型不进行重试

重新投递信息
cfg.ReceiveEndpoint(“submit-order”, e =>
{
e.UseScheduledRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
e.UseMessageRetry(r => r.Immediate(5));
e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
});
消息冲队列移除之后,在一定时间之后重新投入消息队列。需要配置调度模块(scheduling)

信箱
cfg.ReceiveEndpoint(“submit-order”, e =>
{
e.UseScheduledRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
e.UseMessageRetry(r => r.Immediate(5));
e.UseInMemoryOutbox();

e.Consumer(() => new SubmitOrderConsumer(sessionFactory));

});
有些消息是在 consume 方法中发送或发布的,如果在发送之后 consume 中产生了异常,那原来发出去的消息就需要撤回,如果使用信箱之后,在 consume 中要发布/发送的消息就会先暂存在内存中直到 consume 方法成功之后才真正发出去

其他
Fault
Consuming Faults
Error Pipe
Dead-Letter Pipe
Fault
public interface Fault
where T : class
{
Guid FaultId { get; }
Guid? FaultedMessageId { get; }
DateTime Timestamp { get; }
ExceptionInfo[] Exceptions { get; }
HostInfo Host { get; }
T Message { get; }
}
Fault 消息在异常的时候会发布出来

Consuming Faults
public class DashboardFaultConsumer :
IConsumer<Fault>
{
public async Task Consume(ConsumeContext<Fault> context)
{
// update the dashboard
}
}
Fault 消息也是可以进行订阅的

Error Pipe
cfg.ReceiveEndpoint(“input-queue”, ec =>
{
ec.DiscardFaultedMessages();
});
默认情况下错误的消息会被投递到了 _error 队列,可以配置直接抛弃错误信息

Dead-Letter Pipe
cfg.ReceiveEndpoint(“input-queue”, ec =>
{
ec.DiscardSkippedMessages();
});

亚马逊测评 www.yisuping.cn