<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          .NET 云原生架構(gòu)師訓(xùn)練營(yíng)(模塊二 基礎(chǔ)鞏固 RabbitMQ Masstransit 詳解)--學(xué)習(xí)筆記

          共 5575字,需瀏覽 12分鐘

           ·

          2021-01-15 17:29

          2.6.7 RabbitMQ -- Masstransit 詳解

          • Consumer 消費(fèi)者

          • Producer 生產(chǎn)者

          • Request-Response 請(qǐng)求-響應(yīng)

          Consumer 消費(fèi)者

          在 MassTransit 中,一個(gè)消費(fèi)者可以消費(fèi)一種或多種消息

          消費(fèi)者的類(lèi)型包括:普通消費(fèi)者,saga,saga 狀態(tài)機(jī),路由活動(dòng)(分布式追蹤),處理器 handlers,工作消費(fèi)者 job comsumers

          • Consumer

          • Instance

          • Handler

          • Others

          Consumer

          public class Program
          {
          public static async Task Main()
          {
          var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
          {
          cfg.ReceiveEndpoint("order-service", e =>
          {
          e.Consumer();
          });
          });
          }
          }

          繼承 IConsumer,實(shí)現(xiàn) Consume 方法

          class SubmitOrderConsumer :
          IConsumer<SubmitOrder>
          {
          public async Task Consume(ConsumeContext context)
          {
          await context.Publish(new
          {
          context.Message.OrderId
          });
          }
          }

          三個(gè)原則:

          • 擁抱 The Hollywood Principle, which states, "Dont't call us, we'll call you."

          • Consume 方法是一個(gè)被等待的方法,在執(zhí)行中時(shí)其他消費(fèi)者無(wú)法接收到這個(gè)消息,當(dāng)這個(gè)方法完成的時(shí)候,消息被 ack,并且從隊(duì)列中移除

          • Task 方法異常會(huì)導(dǎo)致消息觸發(fā) retry,如果沒(méi)有配置重試,消息將被投遞到失敗隊(duì)列

          Instance

          public class Program
          {
          public static async Task Main()
          {
          var submitOrderConsumer = new SubmitOrderConsumer();

          var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
          {
          cfg.ReceiveEndpoint("order-service", e =>
          {
          e.Instance(submitOrderConsumer);
          });
          });
          }
          }

          所有接收到的消息都由一個(gè)消費(fèi)者來(lái)實(shí)例來(lái)處理(請(qǐng)確保這個(gè)消費(fèi)者類(lèi)是線(xiàn)程安全)

          Consumer 每次接收到消息都會(huì) new 一個(gè)實(shí)例

          Handler

          public class Program
          {
          public static async Task Main()
          {
          var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
          {
          cfg.ReceiveEndpoint("order-service", e =>
          {
          e.Handler(async context =>
          {
          await Console.Out.WriteLineAsync($"Submit Order Received: {context.Message.OrderId}");
          });
          });
          });
          }
          }

          通過(guò)一個(gè)委托 Lambda 方法,來(lái)消費(fèi)消息

          Others

          • Saga<>

          • StateMachineSaga<>

          Producer 生產(chǎn)者

          消息的生產(chǎn)可以通過(guò)兩種方式產(chǎn)生:發(fā)送和發(fā)布

          發(fā)送的時(shí)候需要指定一個(gè)具體的地址 DestinationAddress,發(fā)布的時(shí)候消息會(huì)被廣播給所有訂閱了這個(gè)消息類(lèi)型的消費(fèi)者

          基于這兩種規(guī)則,消息被定義為:命令 command 和事件 event

          • send

          • publish

          send

          可以調(diào)用以下對(duì)象的 send 方法來(lái)發(fā)送 command:

          • ConsumeContext (在 Consumer 的 Consumer 方法參數(shù)中傳遞)

          • ISendEndpointProvider(可以從 DI 中獲?。?/p>

          • IBusControl(最頂層的控制對(duì)象,用來(lái)啟動(dòng)和停止 masstransit 的控制器)

          ConsumeContext

          public class SubmitOrderConsumer : 
          IConsumer<SubmitOrder>
          {
          private readonly IOrderSubmitter _orderSubmitter;

          public SubmitOrderConsumer(IOrderSubmitter submitter)
          => _orderSubmitter = submitter;

          public async Task Consume(IConsumeContext context)
          {
          await _orderSubmitter.Process(context.Message);

          await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));
          }
          }

          ISendEndpointProvider

          public async Task SendOrder(ISendEndpointProvider sendEndpointProvider)
          {
          var endpoint = await sendEndpointProvider.GetSendEndpoint(_serviceAddress);

          await endpoint.Send(new SubmitOrder { OrderId = "123" });
          }

          publish

          • 發(fā)送地址

          • 短地址

          • Convention Map

          發(fā)送地址

          • rabbitmq://localhost/input-queue

          • rabbitmq://localhost/input-queue?durable=false

          短地址

          • GetSendEndpoint(new Uri("queue:input-queue"))

          Convention Map

          在配置文件中指定 map 規(guī)則

          EndpointConvention.Map<StartDelivery>(new Uri(ConfigurationManager.AppSettings["deliveryServiceQueue"]));

          直接發(fā)送

          public class SubmitOrderConsumer : 
          IConsumer<SubmitOrder>
          {
          private readonly IOrderSubmitter _orderSubmitter;

          public SubmitOrderConsumer(IOrderSubmitter submitter)
          => _orderSubmitter = submitter;

          public async Task Consume(IConsumeContext context)
          {
          await _orderSubmitter.Process(context.Message);

          await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));
          }
          }

          可以調(diào)用以下對(duì)象的 publish 方法來(lái)發(fā)送 event:

          • ConsumeContext (在 Consumer 的 Consumer 方法參數(shù)中傳遞)

          • IPublishEndpoint(可以從 DI 中獲?。?/p>

          • IBusControl(最頂層的控制對(duì)象,用來(lái)啟動(dòng)和停止 masstransit 的控制器)

          IPublishEndpoint

          public async Task NotifyOrderSubmitted(IPublishEndpoint publishEndpoint)
          {
          await publishEndpoint.Publish(new
          {
          OrderId = "27",
          OrderDate = DateTime.UtcNow,
          });
          }

          Request-Response 請(qǐng)求-響應(yīng)

          Request-Response 模式讓?xiě)?yīng)用程序之間解耦之后,依然采用同步的方式

          • Consumer

          • IClientFactory

          • IRequestClient

          • Send a request

          Consumer

          public async Task Consume(ConsumeContext context)
          {
          var order = await _orderRepository.Get(context.Message.OrderId);
          if (order == null)
          throw new InvalidOperationException("Order not found");

          await context.RespondAsync(new
          {
          OrderId = order.Id,
          order.Timestamp,
          order.StatusCode,
          order.StatusText
          });
          }

          需要處理返回類(lèi)型 OrderStatusResult,異步方式模擬同步,實(shí)際上同樣有消息隊(duì)列,消費(fèi)者處理過(guò)程

          IClientFactory

          public interface IClientFactory 
          {
          IRequestClient<T> CreateRequestClient<T>(ConsumeContext context, Uri destinationAddress, RequestTimeout timeout);

          IRequestClient<T> CreateRequestClient<T>(Uri destinationAddress, RequestTimeout timeout);

          RequestHandle<T> CreateRequest<T>(T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);

          RequestHandle<T> CreateRequest<T>(ConsumeContext context, T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);
          }

          通過(guò) IBusControl 的 CreateClientFactory 方法可以得到 ClientFactory

          IRequestClient

          public interface IRequestClient<TRequest>
          where TRequest : class
          {
          RequestHandle Create(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);

          Task> GetResponse(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);
          }

          RequestClient 可以創(chuàng)建請(qǐng)求,或者直接獲得響應(yīng)

          Send a request

          var serviceAddress = new Uri("rabbitmq://localhost/check-order-status");
          var client = bus.CreateRequestClient(serviceAddress);

          var response = await client.GetResponse(new { OrderId = id});

          課程鏈接

          .NET云原生架構(gòu)師訓(xùn)練營(yíng)講什么,怎么講,講多久


          歡迎各位讀者加入微信群一起學(xué)習(xí)交流,
          在公眾號(hào)后臺(tái)回復(fù)“加群”即可~~


          瀏覽 107
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲中文无码乱伦 | 日韩一级在线播放免费观看 | av片电影在线播放 | 爱情岛成人亚洲WWW论坛 | 91麻豆国产福利精品精华液 |