聊聊異步編程的 7 種實現(xiàn)方式
最近有很多小伙伴給我留言,能不能總結下異步編程,今天就和大家簡單聊聊這個話題。
早期的系統(tǒng)是同步的,容易理解,我們來看個例子
同步編程

當用戶創(chuàng)建一筆電商交易訂單時,要經(jīng)歷的業(yè)務邏輯流程還是很長的,每一步都要耗費一定的時間,那么整體的RT就會比較長。
于是,聰明的人們開始思考能不能將一些非核心業(yè)務從主流程中剝離出來,于是有了異步編程雛形。
異步編程是讓程序并發(fā)運行的一種手段。它允許多個事件同時發(fā)生,當程序調用需要長時間運行的方法時,它不會阻塞當前的執(zhí)行流程,程序可以繼續(xù)運行。

核心思路:采用多線程優(yōu)化性能,將串行操作變成并行操作。異步模式設計的程序可以顯著減少線程等待,從而在高吞吐量場景中,極大提升系統(tǒng)的整體性能,顯著降低時延。
接下來,我們來講下異步有哪些編程實現(xiàn)方式
一、線程 Thread
直接繼承 Thread類 是創(chuàng)建異步線程最簡單的方式。
首先,創(chuàng)建Thread子類,普通類或匿名內部類方式;然后創(chuàng)建子類實例;最后通過start()方法啟動線程。
public class AsyncThread extends Thread{
@Override
public void run() {
System.out.println("當前線程名稱:" + this.getName() + ", 執(zhí)行線程名稱:" + Thread.currentThread().getName() + "-hello");
}
}
public static void main(String[] args) {
// 模擬業(yè)務流程
// .......
// 創(chuàng)建異步線程
AsyncThread asyncThread = new AsyncThread();
// 啟動異步線程
asyncThread.start();
}
當然如果每次都創(chuàng)建一個 Thread線程,頻繁的創(chuàng)建、銷毀,浪費系統(tǒng)資源。我們可以采用線程池
@Bean(name = "executorService")
public ExecutorService downloadExecutorService() {
return new ThreadPoolExecutor(20, 40, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
new ThreadFactoryBuilder().setNameFormat("defaultExecutorService-%d").build(),
(r, executor) -> log.error("defaultExecutor pool is full! "));
}
將業(yè)務邏輯封裝到 Runnable 或 Callable 中,交由 線程池 來執(zhí)行

二、Future
上述方式雖然達到了多線程并行處理,但有些業(yè)務不僅僅要執(zhí)行過程,還要獲取執(zhí)行結果。
Java 從1.5版本開始,提供了 Callable 和 Future,可以在任務執(zhí)行完畢之后得到任務執(zhí)行結果。
當然也提供了其他功能,如:取消任務、查詢任務是否完成等
Future類位于java.util.concurrent包下,接口定義:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
方法描述:
cancel():取消任務,如果取消任務成功返回true,如果取消任務失敗則返回false isCancelled():表示任務是否被取消成功,如果在任務正常完成前被取消成功,則返回 true isDone():表示任務是否已經(jīng)完成,如果完成,返回true get():獲取執(zhí)行結果,這個方法會產(chǎn)生阻塞,會一直等到任務執(zhí)行完畢才返回 get(long timeout, TimeUnit unit):用來獲取執(zhí)行結果,如果在指定時間內,還沒獲取到結果,就直接返回null
代碼示例:
public class CallableAndFuture {
public static ExecutorService executorService = new ThreadPoolExecutor(4, 40,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), new ThreadFactoryBuilder()
.setNameFormat("demo-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy());
static class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
return "異步處理,Callable 返回結果";
}
}
public static void main(String[] args) {
Future<String> future = executorService.submit(new MyCallable());
try {
System.out.println(future.get());
} catch (Exception e) {
// nodo
} finally {
executorService.shutdown();
}
}
}
Future 表示一個可能還沒有完成的異步任務的結果,通過 get 方法獲取執(zhí)行結果,該方法會阻塞直到任務返回結果。
三、FutureTask
FutureTask 實現(xiàn)了 RunnableFuture 接口,則 RunnableFuture 接口繼承了 Runnable 接口和 Future 接口,所以可以將 FutureTask 對象作為任務提交給 ThreadPoolExecutor 去執(zhí)行,也可以直接被 Thread 執(zhí)行;又因為實現(xiàn)了 Future 接口,所以也能用來獲得任務的執(zhí)行結果。
FutureTask 構造函數(shù):
public FutureTask(Callable<V> callable)
public FutureTask(Runnable runnable, V result)
FutureTask 常用來封裝 Callable 和 Runnable,可以作為一個任務提交到線程池中執(zhí)行。除了作為一個獨立的類之外,也提供了一些功能性函數(shù)供我們創(chuàng)建自定義 task 類使用。
FutureTask 線程安全由CAS來保證。
ExecutorService executor = Executors.newCachedThreadPool();
// FutureTask包裝callbale任務,再交給線程池執(zhí)行
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
System.out.println("子線程開始計算:");
Integer sum = 0;
for (int i = 1; i <= 100; i++)
sum += i;
return sum;
});
// 線程池執(zhí)行任務, 運行結果在 futureTask 對象里面
executor.submit(futureTask);
try {
System.out.println("task運行結果計算的總和為:" + futureTask.get());
} catch (Exception e) {
e.printStackTrace();
}
executor.shutdown();
Callable 和 Future 的區(qū)別:Callable 用于產(chǎn)生結果,F(xiàn)uture 用于獲取結果

如果是對多個任務多次自由串行、或并行組合,涉及多個線程之間同步阻塞獲取結果,F(xiàn)uture 代碼實現(xiàn)會比較繁瑣,需要我們手動處理各個交叉點,很容易出錯。
四、異步框架 CompletableFuture
Future 類通過 get() 方法阻塞等待獲取異步執(zhí)行的運行結果,性能比較差。
JDK1.8 中,Java 提供了 CompletableFuture 類,它是基于異步函數(shù)式編程。相對阻塞式等待返回結果,CompletableFuture 可以通過回調的方式來處理計算結果,實現(xiàn)了異步非阻塞,性能更優(yōu)。
優(yōu)點:
異步任務結束時,會自動回調某個對象的方法 異步任務出錯時,會自動回調某個對象的方法 主線程設置好回調后,不再關心異步任務的執(zhí)行
泡茶示例:

(內容摘自:極客時間的《Java 并發(fā)編程實戰(zhàn)》)
//任務1:洗水壺->燒開水
CompletableFuture<Void> f1 =
CompletableFuture.runAsync(() -> {
System.out.println("T1:洗水壺...");
sleep(1, TimeUnit.SECONDS);
System.out.println("T1:燒開水...");
sleep(15, TimeUnit.SECONDS);
});
//任務2:洗茶壺->洗茶杯->拿茶葉
CompletableFuture<String> f2 =
CompletableFuture.supplyAsync(() -> {
System.out.println("T2:洗茶壺...");
sleep(1, TimeUnit.SECONDS);
System.out.println("T2:洗茶杯...");
sleep(2, TimeUnit.SECONDS);
System.out.println("T2:拿茶葉...");
sleep(1, TimeUnit.SECONDS);
return "龍井";
});
//任務3:任務1和任務2完成后執(zhí)行:泡茶
CompletableFuture<String> f3 =
f1.thenCombine(f2, (__, tf) -> {
System.out.println("T1:拿到茶葉:" + tf);
System.out.println("T1:泡茶...");
return "上茶:" + tf;
});
//等待任務3執(zhí)行結果
System.out.println(f3.join());
}
CompletableFuture 提供了非常豐富的API,大約有50種處理串行,并行,組合以及處理錯誤的方法。
更多內容移步之前寫的一篇文章,搞定 CompletableFuture,并發(fā)異步編程和編寫串行程序還有什么區(qū)別?
五、 SpringBoot 注解 @Async
除了硬編碼的異步編程處理方式,SpringBoot 框架還提供了 注解式 解決方案,以 方法體 為邊界,方法體內部的代碼邏輯全部按異步方式執(zhí)行。
首先,使用 @EnableAsync 啟用異步注解
@SpringBootApplication
@EnableAsync
public class StartApplication {
public static void main(String[] args) {
SpringApplication.run(StartApplication.class, args);
}
}
自定義線程池:
@Configuration
@Slf4j
public class ThreadPoolConfiguration {
@Bean(name = "defaultThreadPoolExecutor", destroyMethod = "shutdown")
public ThreadPoolExecutor systemCheckPoolExecutorService() {
return new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(10000),
new ThreadFactoryBuilder().setNameFormat("default-executor-%d").build(),
(r, executor) -> log.error("system pool is full! "));
}
}
在異步處理的方法上添加注解 @Async ,當對 execute 方法 調用時,通過自定義的線程池 defaultThreadPoolExecutor 異步化執(zhí)行 execute 方法
@Service
public class AsyncServiceImpl implements AsyncService {
@Async("defaultThreadPoolExecutor")
public Boolean execute(Integer num) {
System.out.println("線程:" + Thread.currentThread().getName() + " , 任務:" + num);
return true;
}
}
用 @Async 注解標記的方法,稱為異步方法。在spring boot應用中使用 @Async 很簡單:
調用異步方法類上或者啟動類加上注解 @EnableAsync 在需要被異步調用的方法外加上 @Async 所使用的 @Async 注解方法的類對象應該是Spring容器管理的bean對象;
六、Spring ApplicationEvent 事件
事件機制在一些大型項目中被經(jīng)常使用,Spring 專門提供了一套事件機制的接口,滿足了架構原則上的解耦。
ApplicationContext 通過 ApplicationEvent 類和 ApplicationListener 接口進行事件處理。如果將實現(xiàn) ApplicationListener 接口的 bean 注入到上下文中,則每次使用 ApplicationContext 發(fā)布 ApplicationEvent 時,都會通知該 bean。本質上,這是標準的觀察者設計模式。
ApplicationEvent 是由 Spring 提供的所有 Event 類的基類
首先,自定義業(yè)務事件子類,繼承自 ApplicationEvent,通過泛型注入業(yè)務模型參數(shù)類。相當于 MQ 的消息體。
public class OrderEvent extends AbstractGenericEvent<OrderModel> {
public OrderEvent(OrderModel source) {
super(source);
}
}
然后,編寫事件監(jiān)聽器。ApplicationListener 接口是由 Spring 提供的事件訂閱者必須實現(xiàn)的接口,我們需要定義一個子類,繼承 ApplicationListener。相當于 MQ 的消費端
@Component
public class OrderEventListener implements ApplicationListener<OrderEvent> {
@Override
public void onApplicationEvent(OrderEvent event) {
System.out.println("【OrderEventListener】監(jiān)聽器處理!" + JSON.toJSONString(event.getSource()));
}
}
最后,發(fā)布事件,把某個事件告訴所有與這個事件相關的監(jiān)聽器。相當于 MQ 的生產(chǎn)端。
OrderModel orderModel = new OrderModel();
orderModel.setOrderId((long) i);
orderModel.setBuyerName("Tom-" + i);
orderModel.setSellerName("judy-" + i);
orderModel.setAmount(100L);
// 發(fā)布Spring事件通知
SpringUtils.getApplicationContext().publishEvent(new OrderEvent(orderModel));
加個餐:
[消費端]線程:http-nio-8090-exec-1,消費事件 {"amount":100.0,"buyerName":"Tom-1","orderId":1,"sellerName":"judy-1"}
[生產(chǎn)端]線程:http-nio-8090-exec-1,發(fā)布事件 1
[消費端]線程:http-nio-8090-exec-1,消費事件 {"amount":100.0,"buyerName":"Tom-2","orderId":2,"sellerName":"judy-2"}
[生產(chǎn)端]線程:http-nio-8090-exec-1,發(fā)布事件 2
[消費端]線程:http-nio-8090-exec-1,消費事件 {"amount":100.0,"buyerName":"Tom-3","orderId":3,"sellerName":"judy-3"}
[生產(chǎn)端]線程:http-nio-8090-exec-1,發(fā)布事件 3
上面是跑了個demo的運行結果,我們發(fā)現(xiàn)無論生產(chǎn)端還是消費端,使用了同一個線程 http-nio-8090-exec-1,Spring 框架的事件機制默認是同步阻塞的。只是在代碼規(guī)范方面做了解耦,有較好的擴展性,但底層還是采用同步調用方式。
那么問題來了,如果想實現(xiàn)異步調用,如何處理?
我們需要手動創(chuàng)建一個 SimpleApplicationEventMulticaster,并設置 TaskExecutor,此時所有的消費事件采用異步線程執(zhí)行。
@Component
public class SpringConfiguration {
@Bean
public SimpleApplicationEventMulticaster applicationEventMulticaster(@Qualifier("defaultThreadPoolExecutor") ThreadPoolExecutor defaultThreadPoolExecutor) {
SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
simpleApplicationEventMulticaster.setTaskExecutor(defaultThreadPoolExecutor);
return simpleApplicationEventMulticaster;
}
}
我們看下改造后的運行結果:
[生產(chǎn)端]線程:http-nio-8090-exec-1,發(fā)布事件 1
[生產(chǎn)端]線程:http-nio-8090-exec-1,發(fā)布事件 2
[生產(chǎn)端]線程:http-nio-8090-exec-1,發(fā)布事件 3
[消費端]線程:default-executor-1,消費事件 {"amount":100.0,"buyerName":"Tom-2","orderId":2,"sellerName":"judy-2"}
[消費端]線程:default-executor-2,消費事件 {"amount":100.0,"buyerName":"Tom-1","orderId":1,"sellerName":"judy-1"}
[消費端]線程:default-executor-0,消費事件 {"amount":100.0,"buyerName":"Tom-3","orderId":3,"sellerName":"judy-3"}
SimpleApplicationEventMulticaster 這個我們自己實例化的 Bean 與系統(tǒng)默認的加載順序如何?會不會有沖突?
查了下 Spring 源碼,處理邏輯在 AbstractApplicationContext#initApplicationEventMulticaster 方法中,通過 beanFactory 查找是否有自定義的 Bean,如果沒有,容器會自己 new 一個 SimpleApplicationEventMulticaster 對象注入到容器中。

代碼地址:https://github.com/aalansehaiyang/wx-project
七、消息隊列
異步架構是互聯(lián)網(wǎng)系統(tǒng)中一種典型架構模式,與同步架構相對應。而消息隊列天生就是這種異步架構,具有超高吞吐量和超低時延。
消息隊列異步架構的主要角色包括消息生產(chǎn)者、消息隊列和消息消費者。

消息生產(chǎn)者就是主應用程序,生產(chǎn)者將調用請求封裝成消息發(fā)送給消息隊列。
消息隊列的職責就是緩沖消息,等待消費者消費。根據(jù)消費方式又分為點對點模式和發(fā)布訂閱模式兩種。
消息消費者,用來從消息隊列中拉取、消費消息,完成業(yè)務邏輯處理。
當然市面上消息隊列框架非常多,常見的有RabbitMQ、Kafka、RocketMQ、ActiveMQ 和 Pulsar 等

不同的消息隊列的功能特性會略有不同,但整體架構類似,這里就不展開了。
我們只需要記住一個關鍵點,借助消息隊列這個中間件可以高效的實現(xiàn)異步編程。

往期推薦

阿里出品,SpringBoot自動化部署神器!

Java 中 for 和 foreach 哪個性能高?

如何優(yōu)雅的寫 Controller 層代碼?

