<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          MassTransit | .NET 分布式應用框架

          共 15312字,需瀏覽 31分鐘

           ·

          2022-11-03 19:16

          引言

          ?

          A free, open-source distributed application framework for .NET. 一個免費、開源的.NET 分布式應用框架。 -- MassTransit 官網(wǎng)

          ?

          MassTransit[1],直譯公共交通, 是由Chris Patterson開發(fā)的基于消息驅動的.NET 分布式應用框架,其核心思想是借助消息來實現(xiàn)服務之間的松耦合異步通信,進而確保應用更高的可用性、可靠性和可擴展性。通過對消息模型的高度抽象,以及對主流的消息代理(包括RabbitMQ、ActiveMQ、Kafaka、Azure Service Bus、Amazon SQS等)的集成,大大簡化了基于消息驅動的開發(fā)門檻,同時內(nèi)置了連接管理、消息序列化和消費者生命周期管理,以及諸如重試、限流、斷路器等異常處理機制,讓開發(fā)者更好的專注于業(yè)務實現(xiàn)。

          簡而言之,MassTransit實現(xiàn)了消息代理透明化。無需面向消息代理編程進行諸如連接管理、隊列的申明和綁定等操作,即可輕松實現(xiàn)應用間消息的傳遞和消費。

          快速體驗

          空口無憑,創(chuàng)建一個項目快速體驗一下。

          1. 基于worker模板創(chuàng)建一個基礎項目:dotnet new worker -n MassTransit.Demo
          2. 打開項目,添加NuGet包:MassTransit
          3. 定義訂單創(chuàng)建事件消息契約:
          using System;

          namespace MassTransit.Demo
          {
              public record OrderCreatedEvent
              {
                  public Guid OrderId { getset; }
              }
          }
          1. 修改Worker類,發(fā)送訂單創(chuàng)建事件:
          namespace MassTransit.Demo;

          public class Worker : BackgroundService
          {
              readonly IBus _bus;//注冊總線
              public Worker(IBus bus)
              {
                  _bus = bus;
              }
              protected override async Task ExecuteAsync(CancellationToken stoppingToken)
              {
                  while (!stoppingToken.IsCancellationRequested)
                  {
                      //模擬并發(fā)送訂單創(chuàng)建事件
                      await _bus.Publish(new OrderCreatedEvent(Guid.NewGuid()), stoppingToken);
                      await Task.Delay(1000, stoppingToken);
                  }
              }
          }

          1. 僅需實現(xiàn)IConsumer<OrderCreatedEvent>泛型接口,即可實現(xiàn)消息的訂閱:
          public class OrderCreatedEventConsumerIConsumer<OrderCreatedEvent>
          {
              private readonly ILogger<OrderCreatedEventConsumer> _logger;
              public OrderCreatedEventConsumer(ILogger<OrderCreatedEventConsumer> logger)
              {
                  _logger = logger;
              }
              public Task Consume(ConsumeContext<OrderCreatedEvent> context)
              {
                  _logger.LogInformation($"Received Order:{context.Message.OrderId}");
                  return Task.CompletedTask;
              }
          }
          1. 注冊服務:
          using MassTransit;
          using MassTransit.Demo;

          IHost host = Host.CreateDefaultBuilder(args)
              .ConfigureServices(services =>
              {
                  services.AddHostedService<Worker>();
                  services.AddMassTransit(configurator =>
                  {
                      //注冊消費者
                      configurator.AddConsumer<OrderCreatedEventConsumer>();
                      //使用基于內(nèi)存的消息路由傳輸
                      configurator.UsingInMemory((context, cfg) =>
                      {
                          cfg.ConfigureEndpoints(context);
                      });
                  });
              })
              .Build();

          await host.RunAsync();

          1. 運行項目,一個簡單的進程內(nèi)事件發(fā)布訂閱的應用就完成了。

          如果需要使用RabbitMQ 消息代理進行消息傳輸,則僅需安裝MassTransit.RabbitMQNuGet包,然后指定使用RabbitMQ 傳輸消息即可。

          using MassTransit;
          using MassTransit.Demo;

          IHost host = Host.CreateDefaultBuilder(args)
              .ConfigureServices(services =>
              {
                  services.AddHostedService<Worker>();
                  services.AddMassTransit(configurator =>
                  {
                      configurator.AddConsumer<OrderCreatedEventConsumer>();
                      
                      // configurator.UsingInMemory((context, cfg) =>
                      // {
                      //     cfg.ConfigureEndpoints(context);
                      // });
                      
                      configurator.UsingRabbitMq((context, cfg) =>
                      {
                          cfg.Host(
                              host: "localhost",
                              port: 5672,
                              virtualHost: "/",
                              configure: hostConfig =>
                              {
                                  hostConfig.Username("guest");
                                  hostConfig.Password("guest");
                              });
                          cfg.ConfigureEndpoints(context);
                      });
                  });
              })
              .Build();

          await host.RunAsync();

          運行項目,MassTransit會自動在指定的RabbitMQ上創(chuàng)建一個類型為fanoutMassTransit.Demo.OrderCreatedEventExchange和一個與OrderCreatedEvent同名的隊列進行消息傳輸,如下圖所示。

          核心概念

          MassTranist 為了實現(xiàn)消息代理的透明化和應用間消息的高效傳輸,抽象了以下概念,其中消息流轉流程如下圖所示:

          1. Message:消息契約,定義了消息生產(chǎn)者和消息消費者之間的契約。
          2. Producer:生產(chǎn)者,發(fā)送消息的一方都可以稱為生產(chǎn)者。
          3. SendEndpoint:發(fā)送端點,用于將消息內(nèi)容序列化,并發(fā)送到傳輸模塊。
          4. Transport:傳輸模塊,消息代理透明化的核心,用于和消息代理通信,負責發(fā)送和接收消息。
          5. ReceiveEndpoint:接收端點,用于從傳輸模塊接收消息,反序列化消息內(nèi)容,并將消息路由到消費者。
          6. Consumer:消費者,用于消息消費。

          從上圖可知,本質(zhì)上還是發(fā)布訂閱模式的實現(xiàn),接下來就核心概念進行詳解。

          Message

          Message:消息,可以使用class、interface、struct和record來創(chuàng)建,消息作為一個契約,需確保創(chuàng)建后不能篡改,因此應只保留只讀屬性且不應包含方法和行為。MassTransit使用的是包含命名空間的完全限定名即typeof(T).FullName來表示特定的消息類型。因此若在另外的項目中消費同名的消息類型,需確保消息的命名空間相同。另外需注意「消息不應繼承」,以避免發(fā)送基類消息類型造成的不可預期的結果。為避免此類情況,官方建議「使用接口來定義消息」。在MassTransit中,消息主要分為兩種類型:

          1. Command:命令,「用于告訴服務做什么」,命令被「發(fā)送」到指定端點,僅被一個服務接收并執(zhí)行。一般以動名詞結構命名,如:UpdateAddress、CancelOrder。
          2. Event:事件,「用于告訴服務什么發(fā)生了」,事件被「發(fā)布」到多個端點,可以被多個服務消費。一般以過去式結構命名,如:AddressUpdated,OrderCanceled。

          經(jīng)過MassTransit發(fā)送的消息,會使用信封包裝,包含一些附加信息,數(shù)據(jù)結構舉例如下:

          {
              "messageId""6c600000-873b-00ff-9a8f-08da8da85542",
              "requestId"null,
              "correlationId"null,
              "conversationId""6c600000-873b-00ff-9526-08da8da85544",
              "initiatorId"null,
              "sourceAddress""rabbitmq://localhost/THINKPAD_MassTransitDemo_bus_ptoyyyr88cyx9s1gbdpe5kniy1?temporary=true",
              "destinationAddress""rabbitmq://localhost/MassTransit.Demo:OrderCreatedEvent",
              "responseAddress"null,
              "faultAddress"null,
              "messageType": [
                  "urn:message:MassTransit.Demo:OrderCreatedEvent"
              ],
              "message": {
                  "orderId""fd8a3598-4c3a-4ec9-bbf9-d5f508e1a0d8"
              },
              "expirationTime"null,
              "sentTime""2022-09-03T12:32:15.0796943Z",
              "headers": {},
              "host": {
                  "machineName""THINKPAD",
                  "processName""MassTransit.Demo",
                  "processId"24684,
                  "assembly""MassTransit.Demo",
                  "assemblyVersion""1.0.0.0",
                  "frameworkVersion""6.0.5",
                  "massTransitVersion""8.0.6.0",
                  "operatingSystemVersion""Microsoft Windows NT 10.0.19044.0"
              }
          }

          從以上消息實例中可以看出一個包裝后的消息包含以下核心屬性:

          1. messageId:全局唯一的消息ID
          2. messageType:消息類型
          3. message:消息體,也就是具體的消息實例
          4. sourceAddress:消息來源地址
          5. destinationAddress:消息目標地址
          6. responseAddress:響應地址,在請求響應模式中使用
          7. faultAddress:消息異常發(fā)送地址,用于存儲異常消費消息
          8. headers:消息頭,允許應用自定義擴展信息
          9. correlationId:關聯(lián)Id,在Saga狀態(tài)機中會用到,用來關聯(lián)系列事件
          10. host:宿主,消息來源應用的宿主信息

          Producer

          Producer,生產(chǎn)者,即用于生產(chǎn)消息。在MassTransit主要借助以下對象進行命令的發(fā)送和事件的發(fā)布。

          從以上類圖可以看出,消息的發(fā)送主要核心依賴于兩個接口:

          1. ISendEndpoint:提供了Send方法,用于發(fā)送命令。
          2. IPublishEndpoint:提供了Publish方法,用于發(fā)布事件。

          但基于上圖的繼承體系,可以看出通過IBus、ISendEndpointProviderConsumeContext進行命令的發(fā)送;通過IBusIPublishEndpointProvider進行事件的發(fā)布。具體舉例如下:

          發(fā)送命令

          1. 通過IBus發(fā)送:
          private readonly IBus _bus;
          public async Task Post(CreateOrderRequest request)
          {
              //通過以下方式配置對應消息類型的目標地址
              EndpointConvention.Map<CreateOrderRequest>(new Uri("queue:create-order"));
              await _bus.Send(request);
          }
          1. 通過ISendEndpointProvider發(fā)送:
          private readonly ISendEndpointProvider  _sendEndpointProvider;
          public async Task Post(CreateOrderRequest request)
          {
              var serviceAddress = new Uri("queue:create-order");
              var endpoint = await _sendEndpointProvider.GetSendEndpoint(serviceAddress);
              await endpoint.Send(request);
          }
          1. 通過ConsumeContext發(fā)送:
          public class CreateOrderRequestConsumer:IConsumer<CreateOrderRequest>
          {    
              public async Task Consume(ConsumeContext<CreateOrderRequest> context)
              {
               //do something else
                  var destinationAddress = new Uri("queue:lock-stock");
                  var command = new LockStockRequest(context.Message.OrderId);
                 
                  await context.Send<LockStockRequest>(destinationAddress, command);
             // 也可以通過獲取`SendEndpoint`發(fā)送命令
                  // var endpoint = await context.GetSendEndpoint(destinationAddress);
                  // await endpoint.Send<LockStockRequest>(command);
               
              }
          }

          發(fā)布事件

          1. 通過IBus發(fā)布:
          private readonly IBus _bus;
          public async Task Post(CreateOrderRequest request)
          {
              //do something
              await _bus.Publish(request);
          }
          1. 通過IPublishEndpoint發(fā)布:
          private readonly IPublishEndpoint _publishEndpoint;
          public async Task Post(CreateOrderRequest request)
          {
              //do something
              var order = CreateOrder(request);
              await _publishEndpoint.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id));
          }
          1. 通過ConsumeContext發(fā)布:
          public class CreateOrderRequestConsumerIConsumer<CreateOrderRequest>
          {    
              public async Task Consume(ConsumeContext<CreateOrderRequest> context)
              {
          、  var order = CreateOrder(conext.Message);
               await context.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id));
              }
          }

          Consumer

          Consumer,消費者,即用于消費消息。MassTransit 包括多種消費者類型,主要分為無狀態(tài)和有狀態(tài)兩種消費者類型。

          無狀態(tài)消費者

          無狀態(tài)消費者,即消費者無狀態(tài),消息消費完畢,消費者就釋放。主要的消費者類型有:IConsumer<TMessage>、JobConsumer、IActivityRoutingSlip等。其中IConsumer<TMessage>已經(jīng)在上面的快速體驗部分舉例說明。而JobConsumer<TMessage>主要是對IConsumer<TMessage>的補充,其主要應用場景在于執(zhí)行耗時任務。而對于IActivityRoutingSlip則是MassTransit Courier的核心對象,主要用于實現(xiàn)Saga模式的分布式事務。MassTransit Courier[2] 實現(xiàn)了Routing Slip模式,通過按需有序組合一系列的Activity,得到一個用來限定消息處理順序的Routing Slip。而每個Activity的具體抽象就是IActivityIExecuteActivity。二者的差別在于IActivity定義了ExecuteCompensate兩個方法,而IExecuteActivitiy僅定義了Execute方法。其中Execute代表正向操作,Compensate代表反向補償操作。用一個簡單的下單流程:創(chuàng)建訂單->扣減庫存->支付訂單舉例而言,其示意圖如下所示。而對于具體實現(xiàn),可參閱文章:MassTransit | 基于MassTransit Courier實現(xiàn)Saga 編排式分布式事務

          有狀態(tài)消費者

          有狀態(tài)消費者,即消費者有狀態(tài),其狀態(tài)會持久化,代表的消費者類型為MassTransitStateMachine。MassTransitStateMachineMassTransit Automatonymous 庫定義的,Automatonymous 是一個.NET 狀態(tài)機庫,用于定義狀態(tài)機,包括狀態(tài)、事件和行為。MassTransitStateMachine就是狀態(tài)機的具體抽象,可以用其編排一系列事件來實現(xiàn)狀態(tài)的流轉,也可以用來實現(xiàn)Saga模式的分布式事務。并支持與EF Core和Dapper集成將狀態(tài)持久化到關系型數(shù)據(jù)庫,也支持將狀態(tài)持久化到MongoDB、Redis等數(shù)據(jù)庫。MassTransitStateMachine對于Saga模式分布式事務的實現(xiàn)方式與RoutingSlip不同,還是以簡單的下單流程:創(chuàng)建訂單->扣減庫存->支付訂單舉例而言,其示意圖如下所示?;?code style="overflow-wrap: break-word;margin-right: 2px;margin-left: 2px;font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;color: rgb(53, 148, 247);background: rgba(59, 170, 250, 0.1);padding-right: 2px;padding-left: 2px;border-radius: 2px;height: 21px;line-height: 22px;">MassTransitStateMachine 實現(xiàn)分布式事務詳參后續(xù)文章。

          從上圖可知,通過MassTransitStateMachine可以將事件的執(zhí)行順序邏輯編排在一個集中的狀態(tài)機中,通過發(fā)送命令和訂閱事件來推動狀態(tài)流轉,而這也正是Saga編排模式的實現(xiàn)。

          應用場景

          了解完MassTransit的核心概念,接下來再來看下MassTransit的核心特性以及應用場景:

          1. 基于消息的請求響應模式:可用于同步通信
          2. Mediator模式:中間者模式的實現(xiàn),類似MediatR,但功能更完善
          3. 計劃任務:可用于執(zhí)行定時任務
          4. Routing Slip 模式:可用于實現(xiàn)Saga模式的分布式事務
          5. Saga 狀態(tài)機:可用于實現(xiàn)Saga模式的分布式事務
          6. 本地消息表:類似DotNetCore.Cap,用于實現(xiàn)最終一致性

          總體而言,MassTransit是一款優(yōu)秀的分布式應用框架,可作為分布式應用的消息總線,也可以用作單體應用的事件總線。感興趣的朋友不妨一觀。

          Reference

          [1]

          MassTransit: http://masstransit-project.com

          [2]

          MassTransit Courier: http://masstransit-project.com/advanced/courier/

          [3]

          AspNetCore&MassTransit Courier實現(xiàn)分布式事務: https://www.cnblogs.com/CKExp/p/15027238.html


          瀏覽 89
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  成人精品午夜无码免费 | 忘忧草天天色精品 | 大香蕉伊人电影网 | 自拍无码在线 | 亚洲小黄片 |