.NET 云原生架構(gòu)師訓(xùn)練營(yíng)(模塊二 基礎(chǔ)鞏固 RabbitMQ Masstransit 異常處理)--學(xué)習(xí)筆記
2.6.8 RabbitMQ -- Masstransit 異常處理
異常處理
其他
高級(jí)功能
異常處理
異常與重試
重試配置
重試條件
重新投遞信息
信箱
異常與重試
Exception
public class SubmitOrderConsumer :
IConsumer<SubmitOrder>
{
public Task Consume(ConsumeContext context)
{
throw new Exception("Very bad things happened");
}
}
UseMessageRetry
var sessionFactory = CreateSessionFactory();
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host("rabbitmq://localhost/");
cfg.ReceiveEndpoint("submit-order", e =>
{
e.UseMessageRetry(r => r.Immediate(5));
e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
});
});
重試配置

// 立即重試:一共連續(xù)重試10次
ep.UseMessageRetry(r => r.Immediate(10));
// 間隔重試:一共重試10次,每次間隔10秒
ep.UseMessageRetry(r => r.Interval(10, TimeSpan.FromSeconds(10)));
// 多個(gè)間隔重試:5秒后第一次,5+10秒后第二次,5+10+15秒后第三次
ep.UseMessageRetry(r => r.Intervals(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(15)));
// 指數(shù)級(jí)間隔重試:共10次,每次間隔:當(dāng)前重試次數(shù) * 60秒
ep.UseMessageRetry(r => r.Exponential(10, TimeSpan.FromSeconds(60), TimeSpan.FromHours(24), TimeSpan.FromSeconds(60)));
// 每次疊加50秒
ep.UseMessageRetry(r => r.Incremental(10, TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(50)));
重試條件
e.UseMessageRetry(r =>
{
r.Handle();
r.Ignore(typeof(InvalidOperationException), typeof(InvalidCastException));
r.Ignore(t => t.ParamName == "orderTotal");
});
過(guò)濾某些異常類(lèi)型不進(jìn)行重試
重新投遞信息
cfg.ReceiveEndpoint("submit-order", e =>
{
e.UseScheduledRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
e.UseMessageRetry(r => r.Immediate(5));
e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
});
消息沖隊(duì)列移除之后,在一定時(shí)間之后重新投入消息隊(duì)列。需要配置調(diào)度模塊(scheduling)
信箱
cfg.ReceiveEndpoint("submit-order", e =>
{
e.UseScheduledRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
e.UseMessageRetry(r => r.Immediate(5));
e.UseInMemoryOutbox();
e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
});
有些消息是在 consume 方法中發(fā)送或發(fā)布的,如果在發(fā)送之后 consume 中產(chǎn)生了異常,那原來(lái)發(fā)出去的消息就需要撤回,如果使用信箱之后,在 consume 中要發(fā)布/發(fā)送的消息就會(huì)先暫存在內(nèi)存中直到 consume 方法成功之后才真正發(fā)出去
其他
Fault
Consuming Faults
Error Pipe
Dead-Letter Pipe
Fault
public interface Fault<T>
where T : class
{
Guid FaultId { get; }
Guid? FaultedMessageId { get; }
DateTime Timestamp { get; }
ExceptionInfo[] Exceptions { get; }
HostInfo Host { get; }
T Message { get; }
}
Fault
Consuming Faults
public class DashboardFaultConsumer :
IConsumer>
{
public async Task Consume(ConsumeContext> context)
{
// update the dashboard
}
}
Fault
Error Pipe
cfg.ReceiveEndpoint("input-queue", ec =>
{
ec.DiscardFaultedMessages();
});
默認(rèn)情況下錯(cuò)誤的消息會(huì)被投遞到了 _error 隊(duì)列,可以配置直接拋棄錯(cuò)誤信息
Dead-Letter Pipe
cfg.ReceiveEndpoint("input-queue", ec =>
{
ec.DiscardSkippedMessages();
});
死信隊(duì)列:沒(méi)有消費(fèi)者的消息會(huì)被移到 _skipped 隊(duì)列,但可以配置為不移到 _skipped 隊(duì)列
高級(jí)功能
持久化
Saga 事件串
調(diào)度
Courier 最終一致性
監(jiān)控
課程鏈接
.NET云原生架構(gòu)師訓(xùn)練營(yíng)講什么,怎么講,講多久
評(píng)論
圖片
表情
