探秘分布式異步通信(1)
分布式場景下通常會使用同步和異步方式來進行跨進程通信。同步方式較容易理解,它具有通信實時性高的特點。為了滿足系統(tǒng)的高吞吐,產生了異步通信方式。我們重點聊聊分布式場景下的異步通信方式。
狹義上的同步通信方式是指,當請求發(fā)送出去后,在沒有接收到請求的響應之前,調用方用戶線程只能阻塞等待返回,這個時間內做不了別的事情。
相對的,狹義的異步通信方式是指,當請求借助異步通信機制發(fā)送出去后,用戶線程可以繼續(xù)執(zhí)行別的操作,并不會阻塞等待應答返回;當請求返回后,會借助通知等方式通知調用者,或者通過回調函數來執(zhí)行后續(xù)的邏輯。
廣義上的同步通信方式是指,服務調用方發(fā)送一個請求,需要等待服務提供方執(zhí)行完成的結果,否則就不能繼續(xù)執(zhí)行后續(xù)邏輯。
相對應的,廣義的異步通信方式是指,上游的服務調用方只要確保請求消息成功發(fā)送就可以返回(我們稱這樣的調用方為消息生產者),繼續(xù)執(zhí)行后續(xù)的業(yè)務。業(yè)務邏輯的執(zhí)行交給下游的服務(我們稱這樣的服務為消息消費者)。這種異步執(zhí)行的業(yè)務邏輯通常是耗時的長事務,比如說物流發(fā)貨、視頻轉碼等業(yè)務場景。
本系列重點關注廣義的異步通信方式,目前主流的異步通信方式分為線程池、隊列以及回調機制。
1.線程池
基于線程池能夠實現異步通信。線程池是一種基于池化概念產生的線程集合。
線程池原理
線程池是為了避免頻繁的創(chuàng)建銷毀線程為系統(tǒng)帶來額外的內存、CPU壓力,而對線程進行了復用。當需要使用線程時,從池中取出一個使用,用完后再將此線程“返還”給線程池。 我們稱這里的線程為“工作線程”,通過下圖來形象表達上述的過程。

首先,向線程池中提交任務1。此時線程中已經存在6個工作線程,其中一個處于忙碌狀態(tài),正在執(zhí)行已提交的任務。 然后,線程池從剩余的5個空閑工作線程中選擇一個,分配給任務1,開始執(zhí)行任務1,如下圖所示。此時,工作線程1被分配給任務1,開始執(zhí)行任務1的業(yè)務邏輯,工作線程1的狀態(tài)變更為忙碌。

在任務被執(zhí)行完成后,工作線程并不忙著被關閉,而是被返回線程池中,狀態(tài)仍舊為空閑,方便執(zhí)行后續(xù)到來的任務。

總結來說,線程池就是將“頻繁的創(chuàng)建新的線程”變更為“從線程池中直接獲取一個線程”;將“在執(zhí)行任務結束后關閉線程”變更為“向池中歸還線程”。這樣就避免了頻繁創(chuàng)建、銷毀線程帶來的額外時間和空間的開銷。
使用線程池實現異步通信
從上面的介紹中我們知道,線程池是通過將任務分配給工作線程,實現對任務的執(zhí)行的。即,如果要實現對外的接口調用(或者網絡通信),則完全可以在用戶線程中向線程池中提交一個接口調用任務,然后工作線程就可以立即返回執(zhí)行其他業(yè)務操作,待接口調用返回后通知工作線程對結果進行處理即可。

當用戶線程提交任務到線程池成功后,就接著執(zhí)行其他業(yè)務邏輯了。在程池中工作線程對外部接口的調用返回后,會通知用戶線程繼續(xù)返回的結果。

?實際上,在Java的JUC包中已經實現了這種通知機制,通過線程池+Callable+Future的方式,就能夠很方便、快捷地實現高性能的異步通信機制。 而線程池在Java的JUC包中也提供了參考實現,即ThreadPoolExecutor。另外,Java還提供了Executors工具類,以便我們快速創(chuàng)建模板線程池。這些優(yōu)秀的代碼實現在實戰(zhàn)開發(fā)中都得到了廣泛應用。
?
[實戰(zhàn)]使用線程池實現異步通信
下面通過一段代碼直觀感受一下在Java中使用線程池實現異步通信的過程。
?實現遠程訂單服務,MockRemoteOrderService.java,用于模擬外部系統(tǒng)的訂單查詢功能。
?
/**
* @className MockRemoteOrderService
* @desc 模擬遠端訂單查詢服務
*/
public class MockRemoteOrderService {
private static final Logger LOGGER = LoggerFactory.getLogger(MockRemoteOrderService.class);
public OrderQueryResponse queryOrder(OrderQueryRequest orderQueryRequest) {
try {
LOGGER.info("queryOrder 開始, orderQueryRequest:{}", JSON.toJSONString(orderQueryRequest));
// 模擬網絡耗時 800ms
Thread.sleep(800);
OrderQueryResponse orderQueryResponse =
new OrderQueryResponse(orderQueryRequest.getOrderId(), 19.99);
LOGGER.info("queryOrder 成功, orderQueryResponse:{}", JSON.toJSONString(orderQueryResponse));
return orderQueryResponse;
} catch (Exception e) {
LOGGER.error("queryOrder 異常, orderQueryRequest:{}", JSON.toJSONString(orderQueryRequest), e);
return null;
}
}
}
通過Thread.sleep(800),模擬網絡耗時。
?實現訂單查詢請求和響應實體——OrderQueryRequest.java和OrderQueryResponse.java。
?
/**
* @className OrderQueryRequest
* @desc 訂單查詢請求
*/
public class OrderQueryRequest {
/**訂單號*/
private String orderId;
public OrderQueryRequest(String orderId) {
this.orderId = orderId;
}
省略getter、setter
/**
* @className OrderQueryResponse
* @desc 訂單查詢結果響應
*/
public class OrderQueryResponse {
/**訂單號*/
private String orderId;
/**訂單金額*/
private Double orderAmount;
public OrderQueryResponse(String orderId, Double orderAmount) {
this.orderId = orderId;
this.orderAmount = orderAmount;
}
省略getter、setter
?實現本地訂單查詢業(yè)務邏輯LocalBizService.java。
?
/**
* @className LocalBizService
* @desc 本地業(yè)務邏輯
*/
public class LocalBizService {
private static final Logger LOGGER = LoggerFactory.getLogger(LocalBizService.class);
MockRemoteOrderService mockRemoteOrderService;
/**自定義訂單查詢線程池*/
private static final ExecutorService ORDER_QUERY_THREAD_POOL =
new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() + 1,
Runtime.getRuntime().availableProcessors() * 2,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(500),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "order-query-thread-pool");
thread.setDaemon(true);
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy());
public LocalBizService(MockRemoteOrderService mockRemoteOrderService){
this.mockRemoteOrderService = mockRemoteOrderService;
}
public void execute(String orderId){
// 參數校驗
if (StringUtils.isBlank(orderId)) {
throw new RuntimeException("orderId為空! orderId:" + orderId);
}
// 組裝請求
OrderQueryRequest orderQueryRequest = new OrderQueryRequest(orderId);
// 提交訂單查詢任務
ORDER_QUERY_THREAD_POOL.submit(new Runnable() {
@Override
public void run() {
mockRemoteOrderService.queryOrder(orderQueryRequest);
}
});
LOGGER.info("訂單查詢任務提交成功, orderId:{}", orderId);
// 其他業(yè)務邏輯
doSomething();
}
private void doSomething() {
LOGGER.info("查詢訂單期間繼續(xù)執(zhí)行其他業(yè)務邏輯......");
}
public static void main(String[] args) {
// 實例化mock遠程訂單查詢服務
MockRemoteOrderService mockRemoteOrderService = new MockRemoteOrderService();
// 實例化本地業(yè)務邏輯
LocalBizService localBizService = new LocalBizService(mockRemoteOrderService);
// 提交訂單查詢任務
String orderId = "ORDER_" + UUID.randomUUID().toString();
localBizService.execute(orderId);
while (true) {
// hold住主線程
}
}
}
在main()方法中,首先實例化MockRemoteOrderService對象,模擬調用遠程訂單查詢服務; 接著實例化LocalBizService對象,通過構造注入MockRemoteOrderService對象,模擬本地業(yè)務邏輯; 初始化一個訂單Id,通過調用localBizService.execute(orderId)方法,通過線程池提交了一個訂單查詢任務,模擬發(fā)起一次異步的遠程通信; 在execute()方法中,通過線程池的submit()方法提交了任務到線程池后,繼續(xù)執(zhí)行后續(xù)的業(yè)務邏輯即doSomething()。
最后運行main()方法,運行結果如下:
23:38:59.070 [main] INFO com.snowalker.from.distributed.to.cloudnative.section_1_2_2.async_threadpool.LocalBizService - 訂單查詢任務提交成功, orderId:ORDER_98c0e2e1-964a-4f94-9ec6-f50669d700dd
23:38:59.075 [main] INFO com.snowalker.from.distributed.to.cloudnative.section_1_2_2.async_threadpool.LocalBizService - 查詢訂單期間繼續(xù)執(zhí)行其他業(yè)務邏輯......
23:38:59.150 [order-query-thread-pool] INFO com.snowalker.from.distributed.to.cloudnative.section_1_2_2.async_threadpool.service.MockRemoteOrderService - queryOrder 開始, orderQueryRequest:{"orderId":"ORDER_98c0e2e1-964a-4f94-9ec6-f50669d700dd"}
23:38:59.976 [order-query-thread-pool] INFO com.snowalker.from.distributed.to.cloudnative.section_1_2_2.async_threadpool.service.MockRemoteOrderService - queryOrder 成功, orderQueryResponse:{"orderAmount":19.99,"orderId":"ORDER_98c0e2e1-964a-4f94-9ec6-f50669d700dd"}
通過日志打印可以直觀的看到,在任務提交成功后,主線程繼續(xù)執(zhí)行后續(xù)業(yè)務邏輯,而耗時的訂單查詢任務會通過線程池的工作線程異步執(zhí)行。
這就是通過線程池實現異步通信的代碼實現。
消息隊列
廣義上的隊列分為進程內的隊列及進程間隊列,例如,Java中的ArrayBlockingQueue、LinkedBlockingQueue等就屬于進程內的隊列;而ActiveMQ、Kafka、RocketMQ、RabbitMQ等就屬于進程間的隊列。我們在日常開發(fā)中說的消息隊列,如果沒有特指,往往指的是進程間的隊列。
那么在分布式系統(tǒng)中,如何基于進程間的消息隊列實現異步通信呢? 先看一個實際的案例:這是一個簡化版的分布式電商系統(tǒng),它的訂單中心中保存了核心的訂單交易數據。在系統(tǒng)建設的初期,商品中心進行庫存扣減時,需要關聯訂單數據;物流中心在發(fā)貨時也需要同步訂單數據;支付中心在支付發(fā)起時,也需要同步訂單數據。此時的交互過程如圖所示。

訂單中心直接調用商品中心、物流中心、支付中心提供的數據同步接口,將訂單相關數據同步發(fā)送給了其他的系統(tǒng)。這看起來很正常,也沒有什么問題發(fā)生。
隨著系統(tǒng)規(guī)模逐漸增長,又有更多的系統(tǒng)需要獲取訂單數據進行各自的業(yè)務操作和數據分析,比如:風控中心需要通過訂單數據進行風控相關的操作;倉儲中心需要通過分析訂單數據對貨品管理進行調控;廣告投放中心需要通過訂單數據對廣告轉化率進行計算和優(yōu)化;數據中心需要通過對訂單數據進行匯總清洗,產出報表和大盤;財務中心需要通過訂單數據建立起實時的結算體系等。
于是訂單中心逐步增加了對其他系統(tǒng)的數據同步的代碼,用以支持它們各自的需求。

由于采用的是同步接口對接方式進行數據同步,因此訂單中心中不得不對新增的代碼添加異常處理代碼,防止因為下游服務異常而導致訂單中心自己的業(yè)務出現級聯影響。隨著業(yè)務逐步發(fā)展,下游系統(tǒng)也在進行改造,下游系統(tǒng)的數據同步接口一旦發(fā)生修改,比如增加字段、變更服務地址等,都需要通知訂單中心進行代碼修改,一時間訂單中心的研發(fā)人員苦不堪言。
實際的業(yè)務場景中,訂單中心面臨的下游系統(tǒng)不只圖上的這些。在大型互聯網公司中,下游系統(tǒng)本身也是一個復雜的分布式系統(tǒng),其中包含了數十上百的子服務。這樣一來,訂單中心需要對接的系統(tǒng)可能會達到數百上千個,甚至更多。這其中的調用復雜關系可想而知,代碼的復雜程度也難以想象,一個數據同步接口中動輒幾千行代碼不足為奇。
長期發(fā)展下去,勢必會造成線上問題頻出,況且數據同步接口并非核心業(yè)務邏輯,但是卻需要投入大量的成本去維護,降低了上下游的迭代效率,還使得開發(fā)運維人員疲于奔命,對于個人亦或者企業(yè)而言都是不值得的。而這一切的根源都是因為采用了接口同步調用的方式去傳遞訂單數據。而同步調用本身就是一種強耦合的通信方式。
那么改造的思路就顯而易見,就是想辦法優(yōu)化數據傳輸方式,降低系統(tǒng)間的耦合度。改造思路就是通過使用消息隊列來實現系統(tǒng)之間的松耦合。

上圖所示,訂單中心不再像之前那樣通過調用下游各個中心的訂單同步接口,而是直接將訂單同步消息發(fā)送到MQ消息隊列(如Kakfa、RocketMQ等)中的訂單同步Topic。下游的中心如果對訂單消息感興趣,則自行訂閱該Topic,拉取消息進行消費即可實現訂單同步。一旦不需要同步訂單消息,則下游的中心主動取消訂閱Topic即可,上游的訂單中心完全不需要感知下游如何去消費消息。而且一旦又有新的下游的服務也需要進行訂單同步,實現新的業(yè)務邏輯,則該中心只需要實現消費訂單消息的邏輯即可,直接與MQ消息隊列進行交互,同樣不需要上游的訂單消息進行感知。
對于上游的訂單消息而言,它要做的就是專注于將訂單同步消息生產出來并發(fā)送(投遞)到MQ消息隊列,并保證消息發(fā)送成功。實現了與下游各種中心的松耦合,則在代碼量大幅度減少的同時,訂單中心的穩(wěn)定性得到了提高。原先訂單中心需要保證訂單同步請求被下游的系統(tǒng)接收并處理完成才能繼續(xù)后續(xù)的操作。
假設調用每個服務的平均響應時長是100ms(實際情況中可能會由于網絡擁塞變得時間更長,這里是一種理想化的情況),那么上圖中的8個中心同步數據的總時長為100ms × 8 = 800ms。而通過MQ消息隊列優(yōu)化后的平均響應時長能夠降低數十毫秒,比如20ms。這是因為,消息投遞本身是一個高性能的操作,只要保證消息發(fā)送成功并被MQ消息隊列接收并持久化即可。后續(xù)的消費者對消息的消費過程,上游的發(fā)送者完全不用關注。

即當訂單中心在邏輯0中完成發(fā)送訂單同步消息的操作后,就可以繼續(xù)執(zhí)行邏輯1,邏輯2等后續(xù)的業(yè)務邏輯。
對于整個鏈路而言,訂單同步過程就變成了一種異步的通信方式,這樣降低了上游業(yè)務邏輯執(zhí)行耗時,解耦了上下游之間的交互過程,提升了系統(tǒng)的處理能力和吞吐量。對于用戶而言,等待時長也明顯變短,提升了用戶體驗。
[實戰(zhàn)]使用消息隊列實現異步訂單同步
下面通過一段代碼模擬改造后的場景,直觀地體驗一下如何通過消息隊列實現異步通信的目的。這里使用到的消息隊列是RocketMQ。關于RocketMQ的搭建和原理會在后續(xù)的章節(jié)中詳細展開,這里主要是展示具體的使用方法。
?定義一個訂單同步類,它的屬性是需要通過消息發(fā)送方式異步同步給下游各種中心.
?
/**
* @className OrderInfo
* @desc 訂單同步實體
*/
public class OrderSyncInfo {
/**訂單id*/
private String orderId;
/**訂單金額*/
private Double orderAmount;
/**支付金額*/
private Double payAmount;
/**優(yōu)惠券id*/
private long voucherId;
/**產品id*/
private long productId;
/**商品名稱*/
private String productName;
/**創(chuàng)建時間*/
private Date createTime;
/**發(fā)貨時間*/
private Date deliverTime;
public OrderSyncInfo(String orderId,
Double orderAmount,
Double payAmount,
long voucherId,
long productId,
String productName,
Date createTime,
Date deliverTime) {
this.orderId = orderId;
this.orderAmount = orderAmount;
this.payAmount = payAmount;
this.voucherId = voucherId;
this.productId = productId;
this.productName = productName;
this.createTime = createTime;
this.deliverTime = deliverTime;
}
省略getter setter
在上面的代碼中,通過構造方法傳遞對應的訂單屬性,即可實現對訂單同步對象的初始化。
?編寫一個訂單同步生產者類OrderInfoSyncProducer.java,用以發(fā)送訂單同步消息。
?
/**
* @className OrderInfoSyncProducer
* @desc 訂單信息同步生產者
*/
public class OrderInfoSyncProducer {
private DefaultMQProducer producer;
public OrderInfoSyncProducer(String producerGroup) {
// 初始化生產者實例并指定生產者組
producer = new DefaultMQProducer(producerGroup);
// NameServer地址
producer.setNamesrvAddr("192.168.1.108");
try {
producer.start();
} catch (MQClientException e) {
throw new RuntimeException("DefaultMQProducer init error!", e);
}
}
/**
* 發(fā)送消息
* @param orderSyncInfo
* @return
*/
public SendResult send(OrderSyncInfo orderSyncInfo) {
// 序列化訂單對象為字符串消息體
String messageBody = JSON.toJSONString(orderSyncInfo);
try {
// 構造Message對象
Message message = new Message(
"order_info_sync_topic" /* Topic */,
"sync" /* Tag */,
messageBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 發(fā)送消息
SendResult sendResult = producer.send(message);
return sendResult;
} catch (Exception e) {
throw new RuntimeException("send orderInfo sync message error! orderInfo:" + messageBody, e);
}
}
/**
* 關閉生產者
*/
public void shutdown() {
producer.shutdown();
}
}
通過構造方法,傳遞了生產者組。一般生產者組都會帶業(yè)務屬性,方便維護;同時指定了NameServer地址,用于發(fā)現broker地址; 提供了send()方法,用于將OrderSyncInfo對象序列化為JSON格式的消息體,通過Message對象包裝后指定要發(fā)送到消息隊列的Topic,通過producer的send()方法發(fā)送出去。
?編寫訂單同步消費者的代碼。不同的中心均可以通過該類實現對訂單的同步邏輯。
?
/**
* @className OrderInfoSyncConsumer
* @desc 訂單信息同步消費者
*/
public class OrderInfoSyncConsumer {
private DefaultMQPushConsumer consumer;
private String consumerGroup;
public OrderInfoSyncConsumer(String consumerGroup) {
this.consumerGroup = consumerGroup;
// 根據consumerGroup初始化DefaultMQPushConsumer
consumer = new DefaultMQPushConsumer(consumerGroup);
// 集群消費模式
consumer.setMessageModel(MessageModel.CLUSTERING);
// 從哪里開始消費,此處為從offset頭部開始
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// NameServer地址
consumer.setNamesrvAddr("192.168.1.108");
// 訂閱主題
try {
consumer.subscribe("order_info_sync_topic", "*");
} catch (MQClientException e) {
throw new RuntimeException("subscribe topic error!", e);
}
//注冊消息消費回調,用以執(zhí)行消費邏輯
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
}
public void start() {
try {
consumer.start();
} catch (MQClientException e) {
throw new RuntimeException("start consumer error! consumerGroup:" + consumerGroup, e);
}
System.out.printf("Consumer Started. consumerGroup:%s", consumerGroup);
}
}
首先通過構造方法傳遞當前消費者實例的消費者組consumerGroup,通過consumerGroup對DefaultMQPushConsumer進行實例化; 為DefaultMQPushConsumer設置各種屬性,包括:從何處開始消費、消費模式、NameServer地址、注冊消息消費回調接口(案例中通過匿名內部類實現默認的消費邏輯)、通過subscribe訂閱訂單同步主題; consumeMessage方法即為核心的消費業(yè)務邏輯; 通過調用DefaultMQPushConsumer的start方法啟動消費者,并開始執(zhí)行消費。
最后看一下代碼具體是如何使用的。
/**
* @className Client
* @desc 訂單同步測試類
*/
public class Client {
public static void main(String[] args) {
// 生產者發(fā)送訂單同步消息
OrderInfoSyncProducer orderInfoSyncProducer = new OrderInfoSyncProducer("order_info_sync_group");
OrderSyncInfo orderSyncInfo = new OrderSyncInfo(
"order_" + UUID.randomUUID().toString(),
19.99,
19.99,
100001,
200001,
"IPhone11手機殼",
new Date(System.currentTimeMillis()),
new Date(System.currentTimeMillis())
);
// 發(fā)送消息
orderInfoSyncProducer.send(orderSyncInfo);
// 訂單中心消費訂單同步消息
OrderInfoSyncConsumer orderCenterConsumer = new OrderInfoSyncConsumer("order_center_sync_group");
orderCenterConsumer.start();
// 數據中心消費訂單同步消息
OrderInfoSyncConsumer dataCenterConsumer = new OrderInfoSyncConsumer("data_center_sync_group");
dataCenterConsumer.start();
// 倉儲中心消費訂單同步消息
OrderInfoSyncConsumer storageCenterConsumer = new OrderInfoSyncConsumer("storage_center_sync_group");
storageCenterConsumer.start();
}
}
下面對上一段代碼中的main()方法進行簡單的講解:
初始化了生產組名為“order_info_sync_group”的訂單同步生產者實例。這樣在構造方法執(zhí)行完成后會自行調用start啟動生產者; 實例化了一個訂單同步對象。通過構造方法賦值屬性,可以看到該訂單同步實體為一個IPhone11手機殼。 調用send()方法發(fā)送消息。 實例化了orderCenterConsumer(訂單中心訂單同步消費者)、dataCenterConsumer(數據中心訂單同步消費者)、storageCenterConsumer(倉儲中心訂單同步消費者)等消費者實例。在構造方法初始化過程中,每個消費者組會分別訂閱訂單同步主題,并分別開啟消費,互不影響,這與RocketMQ消費消息的機制有關,此處不詳細展開。最后分別調用各自的start()方法開啟消費。
通過代碼案例可以發(fā)現,下游的各種中心只要實現自己的OrderInfoSyncConsumer邏輯并開啟消費,就能夠實現在上游生產者不耦合下游消費者的前提下,完成跨多個系統(tǒng)的異步訂單同步功能。這就是消息隊列的解耦特性在實戰(zhàn)應用的體現。
通過圖例分析、案例解析結合代碼,我們對消息隊列的異步通信方式建立起了一個感性的認知。
下一篇中,我們將繼續(xù)對異步通信中的回調機制進行學習。敬請期待。
