CAP-分布式事務(wù)的解決方案

CAP 是一個(gè)基于 .NET Standard 的 C# 庫(kù),它是一種處理分布式事務(wù)的解決方案,同樣具有 EventBus 的功能,它具有輕量級(jí)、易使用、高性能等特點(diǎn)。
https://github.com/dotnetcore/CAP
在我們構(gòu)建 SOA 或者 微服務(wù)系統(tǒng)的過(guò)程中,我們通常需要使用事件來(lái)對(duì)各個(gè)服務(wù)進(jìn)行集成,在這過(guò)程中簡(jiǎn)單的使用消息隊(duì)列并不能保證數(shù)據(jù)的最終一致性, CAP 采用的是和當(dāng)前數(shù)據(jù)庫(kù)集成的本地消息表的方案來(lái)解決在分布式系統(tǒng)互相調(diào)用的各個(gè)環(huán)節(jié)可能出現(xiàn)的異常,它能夠保證任何情況下事件消息都是不會(huì)丟失的。
你同樣可以把 CAP 當(dāng)做 EventBus 來(lái)使用,CAP提供了一種更加簡(jiǎn)單的方式來(lái)實(shí)現(xiàn)事件消息的發(fā)布和訂閱,在訂閱以及發(fā)布的過(guò)程中,你不需要繼承或?qū)崿F(xiàn)任何接口。
這是CAP集在ASP.NET Core 微服務(wù)架構(gòu)中的一個(gè)示意圖:
架構(gòu)預(yù)覽

CAP 實(shí)現(xiàn)了 eShop 電子書 中描述的發(fā)件箱模式
Getting Started
NuGet
你可以運(yùn)行以下下命令在你的項(xiàng)目中安裝 CAP。
PM> Install-Package DotNetCore.CAP
CAP 支持 Kafka、RabbitMQ、AzureServiceBus 等消息隊(duì)列,你可以按需選擇下面的包進(jìn)行安裝:
PM> Install-Package DotNetCore.CAP.Kafka
PM> Install-Package DotNetCore.CAP.RabbitMQ
PM> Install-Package DotNetCore.CAP.AzureServiceBus
CAP 提供了 Sql Server, MySql, PostgreSQL,MongoDB 的擴(kuò)展作為數(shù)據(jù)庫(kù)存儲(chǔ):
// 按需選擇安裝你正在使用的數(shù)據(jù)庫(kù)
PM> Install-Package DotNetCore.CAP.SqlServer
PM> Install-Package DotNetCore.CAP.MySql
PM> Install-Package DotNetCore.CAP.PostgreSql
PM> Install-Package DotNetCore.CAP.MongoDB
Configuration
首先配置CAP到 Startup.cs 文件中,如下:
public void ConfigureServices(IServiceCollection services)
{
......
services.AddDbContext<AppDbContext>();
services.AddCap(x =>
{
//如果你使用的 EF 進(jìn)行數(shù)據(jù)操作,你需要添加如下配置:
x.UseEntityFramework<AppDbContext>(); //可選項(xiàng),你不需要再次配置 x.UseSqlServer 了
//如果你使用的ADO.NET,根據(jù)數(shù)據(jù)庫(kù)選擇進(jìn)行配置:
x.UseSqlServer("數(shù)據(jù)庫(kù)連接字符串");
x.UseMySql("數(shù)據(jù)庫(kù)連接字符串");
x.UsePostgreSql("數(shù)據(jù)庫(kù)連接字符串");
//如果你使用的 MongoDB,你可以添加如下配置:
x.UseMongoDB("ConnectionStrings"); //注意,僅支持MongoDB 4.0+集群
//CAP支持 RabbitMQ、Kafka、AzureServiceBus 等作為MQ,根據(jù)使用選擇配置:
x.UseRabbitMQ("ConnectionStrings");
x.UseKafka("ConnectionStrings");
x.UseAzureServiceBus("ConnectionStrings");
});
}
發(fā)布
在 Controller 中注入 ICapPublisher 然后使用 ICapPublisher 進(jìn)行消息發(fā)送
public class PublishController : Controller
{
private readonly ICapPublisher _capBus;
public PublishController(ICapPublisher capPublisher)
{
_capBus = capPublisher;
}
//不使用事務(wù)
[Route("~/without/transaction")]
public IActionResult WithoutTransaction()
{
_capBus.Publish("xxx.services.show.time", DateTime.Now);
return Ok();
}
//Ado.Net 中使用事務(wù),自動(dòng)提交
[Route("~/adonet/transaction")]
public IActionResult AdonetWithTransaction()
{
using (var connection = new MySqlConnection(ConnectionString))
{
using (var transaction = connection.BeginTransaction(_capBus, autoCommit: true))
{
//業(yè)務(wù)代碼
_capBus.Publish("xxx.services.show.time", DateTime.Now);
}
}
return Ok();
}
//EntityFramework 中使用事務(wù),自動(dòng)提交
[Route("~/ef/transaction")]
public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext)
{
using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: true))
{
//業(yè)務(wù)代碼
_capBus.Publish("xxx.services.show.time", DateTime.Now);
}
return Ok();
}
}
訂閱
Action Method
在 Action 上添加 CapSubscribeAttribute 來(lái)訂閱相關(guān)消息。
public class PublishController : Controller
{
[CapSubscribe("xxx.services.show.time")]
public void CheckReceivedMessage(DateTime datetime)
{
Console.WriteLine(datetime);
}
}
Service Method
如果你的訂閱方法沒有位于 Controller 中,則你訂閱的類需要繼承 ICapSubscribe:
namespace xxx.Service
{
public interface ISubscriberService
{
void CheckReceivedMessage(DateTime datetime);
}
public class SubscriberService: ISubscriberService, ICapSubscribe
{
[CapSubscribe("xxx.services.show.time")]
public void CheckReceivedMessage(DateTime datetime)
{
}
}
}
然后在 Startup.cs 中的 ConfigureServices() 中注入你的 ISubscriberService 類
public void ConfigureServices(IServiceCollection services)
{
//注意: 注入的服務(wù)需要在 `services.AddCap()` 之前
services.AddTransient<ISubscriberService,SubscriberService>();
services.AddCap(x=>{});
}
訂閱者組
訂閱者組的概念類似于 Kafka 中的消費(fèi)者組,它和消息隊(duì)列中的廣播模式相同,用來(lái)處理不同微服務(wù)實(shí)例之間同時(shí)消費(fèi)相同的消息。
當(dāng)CAP啟動(dòng)的時(shí)候,她將創(chuàng)建一個(gè)默認(rèn)的消費(fèi)者組,如果多個(gè)相同消費(fèi)者組的消費(fèi)者消費(fèi)同一個(gè)Topic消息的時(shí)候,只會(huì)有一個(gè)消費(fèi)者被執(zhí)行。相反,如果消費(fèi)者都位于不同的消費(fèi)者組,則所有的消費(fèi)者都會(huì)被執(zhí)行。
相同的實(shí)例中,你可以通過(guò)下面的方式來(lái)指定他們位于不同的消費(fèi)者組。
[CapSubscribe("xxx.services.show.time", Group = "group1" )]
public void ShowTime1(DateTime datetime)
{
}
[CapSubscribe("xxx.services.show.time", Group = "group2")]
public void ShowTime2(DateTime datetime)
{
}
ShowTime1 和 ShowTime2 處于不同的組,他們將會(huì)被同時(shí)調(diào)用。
PS,你可以通過(guò)下面的方式來(lái)指定默認(rèn)的消費(fèi)者組名稱:
services.AddCap(x =>
{
x.DefaultGroup = "default-group-name";
});
Dashboard
CAP 2.1+ 以上版本中提供了儀表盤(Dashboard)功能,你可以很方便的查看發(fā)出和接收到的消息。除此之外,你還可以在儀表盤中實(shí)時(shí)查看發(fā)送或者接收到的消息。
使用一下命令安裝 Dashboard:
PM> Install-Package DotNetCore.CAP.Dashboard
在分布式環(huán)境中,儀表盤內(nèi)置集成了 Consul 作為節(jié)點(diǎn)的注冊(cè)發(fā)現(xiàn),同時(shí)實(shí)現(xiàn)了網(wǎng)關(guān)代理功能,你同樣可以方便的查看本節(jié)點(diǎn)或者其他節(jié)點(diǎn)的數(shù)據(jù),它就像你訪問(wèn)本地資源一樣。
services.AddCap(x =>
{
//...
// 注冊(cè) Dashboard
x.UseDashboard();
// 注冊(cè)節(jié)點(diǎn)到 Consul
x.UseDiscovery(d =>
{
d.DiscoveryServerHostName = "localhost";
d.DiscoveryServerPort = 8500;
d.CurrentNodeHostName = "localhost";
d.CurrentNodePort = 5800;
d.NodeId = 1;
d.NodeName = "CAP No.1 Node";
});
});
儀表盤默認(rèn)的訪問(wèn)地址是:http://localhost:xxx/cap,你可以在d.MatchPath配置項(xiàng)中修改cap路徑后綴為其他的名字。

【推薦】.NET Core開發(fā)實(shí)戰(zhàn)視頻課程 ★★★
.NET Core實(shí)戰(zhàn)項(xiàng)目之CMS 第一章 入門篇-開篇及總體規(guī)劃
【.NET Core微服務(wù)實(shí)戰(zhàn)-統(tǒng)一身份認(rèn)證】開篇及目錄索引
Redis基本使用及百億數(shù)據(jù)量中的使用技巧分享(附視頻地址及觀看指南)
.NET Core中的一個(gè)接口多種實(shí)現(xiàn)的依賴注入與動(dòng)態(tài)選擇看這篇就夠了
10個(gè)小技巧助您寫出高性能的ASP.NET Core代碼
用abp vNext快速開發(fā)Quartz.NET定時(shí)任務(wù)管理界面
在ASP.NET Core中創(chuàng)建基于Quartz.NET托管服務(wù)輕松實(shí)現(xiàn)作業(yè)調(diào)度
現(xiàn)身說(shuō)法:實(shí)際業(yè)務(wù)出發(fā)分析百億數(shù)據(jù)量下的多表查詢優(yōu)化
