外行人都能看懂的WebFlux,錯(cuò)過了血虧
前言
只有光頭才能變強(qiáng)。
文本已收錄至我的GitHub倉庫,歡迎Star:
https://github.com/ZhongFuCheng3y/3y
本文知識(shí)點(diǎn)架構(gòu):
知識(shí)點(diǎn)架構(gòu)如果有關(guān)注我公眾號(hào)文章的同學(xué)就會(huì)發(fā)現(xiàn),最近我不定時(shí)轉(zhuǎn)發(fā)了一些比較好的WebFlux的文章,因?yàn)槲易罱趯W(xué)。
我之前也說過,學(xué)習(xí)一項(xiàng)技術(shù)之前,先要了解為什么要學(xué)這項(xiàng)技術(shù)。其實(shí)這次學(xué)習(xí)WebFlux也沒有多大的原生動(dòng)力,主要是在我們組內(nèi)會(huì)輪流做一次技術(shù)分享,而我又不知道分享什么比較好…
之前在初學(xué)大數(shù)據(jù)相關(guān)的知識(shí),但是這一塊的時(shí)間線會(huì)拉得比較長,感覺趕不及小組內(nèi)分享(而組內(nèi)的同學(xué)又大部分都懂大數(shù)據(jù),就只有我一個(gè)菜雞,淚目)。所以,想的是:“要不我學(xué)點(diǎn)新東西搞搞?”。于是就花了點(diǎn)時(shí)間學(xué)WebFlux啦~
這篇文章主要講解什么是WebFlux,帶領(lǐng)大家入個(gè)門,希望對(duì)大家有所幫助(至少看完這篇文章,知道WebFlux是干嘛用的)
一、什么是WebFlux?
我們從Spring的官網(wǎng)拉下一點(diǎn)點(diǎn)就可以看到介紹WebFlux的地方了
WebFlux的簡介從官網(wǎng)的簡介中我們能得出什么樣的信息?
我們程序員往往根據(jù)不同的應(yīng)用場景選擇不同的技術(shù),有的場景適合用于同步阻塞的,有的場景適合用于異步非阻塞的。而
Spring5提供了一整套響應(yīng)式(非阻塞)的技術(shù)棧供我們使用(包括Web控制器、權(quán)限控制、數(shù)據(jù)訪問層等等)。而左側(cè)的圖則是技術(shù)棧的對(duì)比啦;
響應(yīng)式一般用Netty或者Servlet 3.1的容器(因?yàn)橹С之惒椒亲枞?,而Servlet技術(shù)棧用的是Servlet容器
在Web端,響應(yīng)式用的是WebFlux,Servlet用的是SpringMVC
…..
總結(jié)起來,WebFlux只是響應(yīng)式編程中的一部分(在Web控制端),所以一般我們用它與SpringMVC來對(duì)比。
二、如何理解響應(yīng)式編程?
在上面提到了響應(yīng)式編程(Reactive Programming),而WebFlux只是響應(yīng)式編程的其中一個(gè)技術(shù)棧而已,所以我們先來探討一下什么是響應(yīng)式編程
從維基百科里邊我們得到的定義:
reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change
響應(yīng)式編程(reactive programming)是一種基于數(shù)據(jù)流(data stream)和變化傳遞(propagation of change)的聲明式(declarative)的編程范式
在維基百科上也舉了個(gè)小例子:
例子意思大概如下:
在命令式編程(我們的日常編程模式)下,式子
a=b+c,這就意味著a的值是由b和c計(jì)算出來的。如果b或者c后續(xù)有變化,不會(huì)影響到a的值在響應(yīng)式編程下,式子
a:=b+c,這就意味著a的值是由b和c計(jì)算出來的。但如果b或者c的值后續(xù)有變化,會(huì)影響到a的值
我認(rèn)為上面的例子已經(jīng)可以幫助我們理解變化傳遞(propagation of change)
那數(shù)據(jù)流(data stream)和聲明式(declarative)怎么理解呢?那可以提一提我們的Stream流了。之前寫過Lambda表達(dá)式和Stream流的文章,大家可以先去看看:
Lambda的語法是這樣的(Stream流的使用會(huì)涉及到很多Lambda表達(dá)式的東西,所以一般先學(xué)Lambda再學(xué)Stream流):
語法Stream流的使用分為三個(gè)步驟(創(chuàng)建Stream流、執(zhí)行中間操作、執(zhí)行最終操作):
三步走執(zhí)行中間操作實(shí)際上就是給我們提供了很多的API去操作Stream流中的數(shù)據(jù)(求和/去重/過濾)等等
中間操作 解釋說了這么多,怎么理解數(shù)據(jù)流和聲明式呢?其實(shí)是這樣的:
本來數(shù)據(jù)是我們自行處理的,后來我們把要處理的數(shù)據(jù)抽象出來(變成了數(shù)據(jù)流),然后通過API去處理數(shù)據(jù)流中的數(shù)據(jù)(是聲明式的)
比如下面的代碼;將數(shù)組中的數(shù)據(jù)變成數(shù)據(jù)流,通過顯式聲明調(diào)用.sum()來處理數(shù)據(jù)流中的數(shù)據(jù),得到最終的結(jié)果:
public?static?void?main(String[]?args)?{
????int[]?nums?=?{?1,?2,?3?};
????int?sum2?=?IntStream.of(nums).parallel().sum();
????System.out.println("結(jié)果為:"?+?sum2);
}
如圖下所示:
數(shù)據(jù)流與聲明式2.1 響應(yīng)式編程->異步非阻塞
上面講了響應(yīng)式編程是什么:
響應(yīng)式編程(reactive programming)是一種基于數(shù)據(jù)流(data stream)和變化傳遞(propagation of change)的聲明式(declarative)的編程范式
也講解了數(shù)據(jù)流/變化傳遞/聲明式是什么意思,但說到響應(yīng)式編程就離不開異步非阻塞。
從Spring官網(wǎng)介紹WebFlux的信息我們就可以發(fā)現(xiàn)asynchronous, nonblocking 這樣的字樣,因?yàn)?strong style="font-size:inherit;color:rgb(233,105,0);">響應(yīng)式編程它是異步的,也可以理解成變化傳遞它是異步執(zhí)行的。
如下圖,合計(jì)的金額會(huì)受其他的金額影響(更新的過程是異步的):
合計(jì)的錢會(huì)因?yàn)槠渌慕痤~影響我們的JDK8 Stream流是同步的,它就不適合用于響應(yīng)式編程(但基礎(chǔ)的用法是需要懂的,因?yàn)轫憫?yīng)式流編程都是操作流嘛)
而在JDK9 已經(jīng)支持響應(yīng)式流了,下面我們來看一下
三、JDK9 Reactive
響應(yīng)式流的規(guī)范早已經(jīng)被提出了:里面提到了:
Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure ? ----->http://www.reactive-streams.org/
翻譯再加點(diǎn)信息:
響應(yīng)式流(Reactive Streams)通過定義一組實(shí)體,接口和互操作方法,給出了實(shí)現(xiàn)異步非阻塞背壓的標(biāo)準(zhǔn)。第三方遵循這個(gè)標(biāo)準(zhǔn)來實(shí)現(xiàn)具體的解決方案,常見的有Reactor,RxJava,Akka Streams,Ratpack等。
規(guī)范里頭實(shí)際上就是定義了四個(gè)接口:
規(guī)范的四個(gè)接口Java 平臺(tái)直到 JDK 9才提供了對(duì)于Reactive的完整支持,JDK9也定義了上述提到的四個(gè)接口,在java.util.concurrent包上
Java的響應(yīng)式流接口一個(gè)通用的流處理架構(gòu)一般會(huì)是這樣的(生產(chǎn)者產(chǎn)生數(shù)據(jù),對(duì)數(shù)據(jù)進(jìn)行中間處理,消費(fèi)者拿到數(shù)據(jù)消費(fèi)):
流式處理架構(gòu)數(shù)據(jù)來源,一般稱為生產(chǎn)者(Producer)
數(shù)據(jù)的目的地,一般稱為消費(fèi)者(Consumer)
在處理時(shí),對(duì)數(shù)據(jù)執(zhí)行某些操作一個(gè)或多個(gè)處理階段。(Processor)
到這里我們再看回響應(yīng)式流的接口,我們應(yīng)該就能懂了:
Publisher(發(fā)布者)相當(dāng)于生產(chǎn)者(Producer)
Subscriber(訂閱者)相當(dāng)于消費(fèi)者(Consumer)
Processor就是在發(fā)布者與訂閱者之間處理數(shù)據(jù)用的
在響應(yīng)式流上提到了back pressure(背壓)這么一個(gè)概念,其實(shí)非常好理解。在響應(yīng)式流實(shí)現(xiàn)異步非阻塞是基于生產(chǎn)者和消費(fèi)者模式的,而生產(chǎn)者消費(fèi)者很容易出現(xiàn)的一個(gè)問題就是:生產(chǎn)者生產(chǎn)數(shù)據(jù)多了,就把消費(fèi)者給壓垮了。
而背壓說白了就是:消費(fèi)者能告訴生產(chǎn)者自己需要多少量的數(shù)據(jù)。這里就是Subscription接口所做的事。
下面我們來看看JDK9接口的方法,或許就更加能理解上面所說的話了:
//?發(fā)布者(生產(chǎn)者)
public?interface?Publisher<T>?{
????public?void?subscribe(Subscriber?super?T>?s);
}
//?訂閱者(消費(fèi)者)
public?interface?Subscriber<T>?{
????public?void?onSubscribe(Subscription?s);
????public?void?onNext(T?t);
????public?void?onError(Throwable?t);
????public?void?onComplete();
}
//?用于發(fā)布者與訂閱者之間的通信(實(shí)現(xiàn)背壓:訂閱者能夠告訴生產(chǎn)者需要多少數(shù)據(jù))
public?interface?Subscription?{
????public?void?request(long?n);
????public?void?cancel();
}
//?用于處理發(fā)布者?發(fā)布消息后,對(duì)消息進(jìn)行處理,再交由消費(fèi)者消費(fèi)
public?interface?Processor<T,R>?extends?Subscriber<T>,?Publisher<R>?{
}
3.1 看個(gè)例子
代碼中有大量的注釋,我就不多BB了,建議直接復(fù)制跑一下看看:
class?MyProcessor?extends?SubmissionPublisher<String>
????????implements?Processor<Integer,?String>?{
????private?Subscription?subscription;
????@Override
????public?void?onSubscribe(Subscription?subscription)?{
????????//?保存訂閱關(guān)系,?需要用它來給發(fā)布者響應(yīng)
????????this.subscription?=?subscription;
????????//?請求一個(gè)數(shù)據(jù)
????????this.subscription.request(1);
????}
????@Override
????public?void?onNext(Integer?item)?{
????????//?接受到一個(gè)數(shù)據(jù),?處理
????????System.out.println("處理器接受到數(shù)據(jù):?"?+?item);
????????//?過濾掉小于0的,?然后發(fā)布出去
????????if?(item?>?0)?{
????????????this.submit("轉(zhuǎn)換后的數(shù)據(jù):"?+?item);
????????}
????????//?處理完調(diào)用request再請求一個(gè)數(shù)據(jù)
????????this.subscription.request(1);
????????//?或者?已經(jīng)達(dá)到了目標(biāo),?調(diào)用cancel告訴發(fā)布者不再接受數(shù)據(jù)了
????????//?this.subscription.cancel();
????}
????@Override
????public?void?onError(Throwable?throwable)?{
????????//?出現(xiàn)了異常(例如處理數(shù)據(jù)的時(shí)候產(chǎn)生了異常)
????????throwable.printStackTrace();
????????//?我們可以告訴發(fā)布者,?后面不接受數(shù)據(jù)了
????????this.subscription.cancel();
????}
????@Override
????public?void?onComplete()?{
????????//?全部數(shù)據(jù)處理完了(發(fā)布者關(guān)閉了)
????????System.out.println("處理器處理完了!");
????????//?關(guān)閉發(fā)布者
????????this.close();
????}
}
public?class?FlowDemo2?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????//?1.?定義發(fā)布者,?發(fā)布的數(shù)據(jù)類型是?Integer
????????//?直接使用jdk自帶的SubmissionPublisher
????????SubmissionPublisher?publiser?=?new?SubmissionPublisher();
????????//?2.?定義處理器,?對(duì)數(shù)據(jù)進(jìn)行過濾,?并轉(zhuǎn)換為String類型
????????MyProcessor?processor?=?new?MyProcessor();
????????//?3.?發(fā)布者?和?處理器?建立訂閱關(guān)系
????????publiser.subscribe(processor);
????????//?4.?定義最終訂閱者,?消費(fèi)?String?類型數(shù)據(jù)
????????Subscriber?subscriber?=?new?Subscriber()?{
????????????private?Subscription?subscription;
????????????@Override
????????????public?void?onSubscribe(Subscription?subscription)?{
????????????????//?保存訂閱關(guān)系,?需要用它來給發(fā)布者響應(yīng)
????????????????this.subscription?=?subscription;
????????????????//?請求一個(gè)數(shù)據(jù)
????????????????this.subscription.request(1);
????????????}
????????????@Override
????????????public?void?onNext(String?item)?{
????????????????//?接受到一個(gè)數(shù)據(jù),?處理
????????????????System.out.println("接受到數(shù)據(jù):?"?+?item);
????????????????//?處理完調(diào)用request再請求一個(gè)數(shù)據(jù)
????????????????this.subscription.request(1);
????????????????//?或者?已經(jīng)達(dá)到了目標(biāo),?調(diào)用cancel告訴發(fā)布者不再接受數(shù)據(jù)了
????????????????//?this.subscription.cancel();
????????????}
????????????@Override
????????????public?void?onError(Throwable?throwable)?{
????????????????//?出現(xiàn)了異常(例如處理數(shù)據(jù)的時(shí)候產(chǎn)生了異常)
????????????????throwable.printStackTrace();
????????????????//?我們可以告訴發(fā)布者,?后面不接受數(shù)據(jù)了
????????????????this.subscription.cancel();
????????????}
????????????@Override
????????????public?void?onComplete()?{
????????????????//?全部數(shù)據(jù)處理完了(發(fā)布者關(guān)閉了)
????????????????System.out.println("處理完了!");
????????????}
????????};
????????//?5.?處理器?和?最終訂閱者?建立訂閱關(guān)系
????????processor.subscribe(subscriber);
????????//?6.?生產(chǎn)數(shù)據(jù),?并發(fā)布
????????publiser.submit(-111);
????????publiser.submit(111);
????????//?7.?結(jié)束后?關(guān)閉發(fā)布者
????????//?正式環(huán)境?應(yīng)該放?finally?或者使用?try-resouce?確保關(guān)閉
????????publiser.close();
????????//?主線程延遲停止,?否則數(shù)據(jù)沒有消費(fèi)就退出
????????Thread.currentThread().join(1000);
????}
}
輸出的結(jié)果如下:
輸出的結(jié)果流程實(shí)際上非常簡單的:
流程參考資料:
https://yanbin.blog/java-9-talk-reactive-stream/#more-8877
https://blog.csdn.net/wudaoshihun/article/details/83070086
http://www.spring4all.com/article/6826
https://www.cnblogs.com/IcanFixIt/p/7245377.html
Java 8 的 Stream 主要關(guān)注在流的過濾,映射,合并,而 ?Reactive Stream 更進(jìn)一層,側(cè)重的是流的產(chǎn)生與消費(fèi),即流在生產(chǎn)與消費(fèi)者之間的協(xié)調(diào)
說白了就是:響應(yīng)式流是異步非阻塞+流量控制的(可以告訴生產(chǎn)者自己需要多少的量/取消訂閱關(guān)系)
展望響應(yīng)式編程的場景應(yīng)用:
比如一個(gè)日志監(jiān)控系統(tǒng),我們的前端頁面將不再需要通過“命令式”的輪詢的方式不斷向服務(wù)器請求數(shù)據(jù)然后進(jìn)行更新,而是在建立好通道之后,數(shù)據(jù)流從系統(tǒng)源源不斷流向頁面,從而展現(xiàn)實(shí)時(shí)的指標(biāo)變化曲線;
再比如一個(gè)社交平臺(tái),朋友的動(dòng)態(tài)、點(diǎn)贊和留言不是手動(dòng)刷出來的,而是當(dāng)后臺(tái)數(shù)據(jù)變化的時(shí)候自動(dòng)體現(xiàn)到界面上的。
四、入門WebFlux
扯了一大堆,終于回到WebFlux了。經(jīng)過上面的基礎(chǔ),我們現(xiàn)在已經(jīng)能夠得出一些結(jié)論的了:
WebFlux是Spring推出響應(yīng)式編程的一部分(web端)
響應(yīng)式編程是異步非阻塞的(是一種基于數(shù)據(jù)流(data stream)和變化傳遞(propagation of change)的聲明式(declarative)的編程范式)
我們再回來看官網(wǎng)的圖:
mvc or webflux4.1 簡單體驗(yàn)WebFlux
Spring官方為了讓我們更加快速/平滑到WebFlux上,之前SpringMVC那套都是支持的。也就是說:我們可以像使用SpringMVC一樣使用著WebFlux。
支持SpringMVC那套WebFlux使用的響應(yīng)式流并不是用JDK9平臺(tái)的,而是一個(gè)叫做Reactor響應(yīng)式流庫。所以,入門WebFlux其實(shí)更多是了解怎么使用Reactor的API,下面我們來看看~
Reactor是一個(gè)響應(yīng)式流,它也有對(duì)應(yīng)的發(fā)布者(Publisher ),Reactor的發(fā)布者用兩個(gè)類來表示:
Mono(返回0或1個(gè)元素)
Flux(返回0-n個(gè)元素)
而消費(fèi)者則是Spring框架幫我們去完成
下面我們來看一個(gè)簡單的例子(基于WebFlux環(huán)境構(gòu)建):
//?阻塞5秒鐘
private?String?createStr()?{
????try?{
????????TimeUnit.SECONDS.sleep(5);
????}?catch?(InterruptedException?e)?{
????}
????return?"some?string";
}
//?普通的SpringMVC方法
@GetMapping("/1")
private?String?get1()?{
????log.info("get1?start");
????String?result?=?createStr();
????log.info("get1?end.");
????return?result;
}
//?WebFlux(返回的是Mono)
@GetMapping("/2")
private?Mono?get2()? {
????log.info("get2?start");
????Mono?result?=?Mono.fromSupplier(()?->?createStr());
????log.info("get2?end.");
????return?result;
}
首先,值得說明的是,我們構(gòu)建WebFlux環(huán)境啟動(dòng)時(shí),應(yīng)用服務(wù)器默認(rèn)是Netty的:
基于Netty我們分別來訪問一下SpringMVC的接口和WebFlux的接口,看一下有什么區(qū)別:
SpringMVC:
SpringMVCWebFlux:
WebFlux從調(diào)用者(瀏覽器)的角度而言,是感知不到有什么變化的,因?yàn)槎际堑玫却?s才返回?cái)?shù)據(jù)。但是,從服務(wù)端的日志我們可以看出,WebFlux是直接返回Mono對(duì)象的(而不是像SpringMVC一直同步阻塞5s,線程才返回)。
這正是WebFlux的好處:能夠以固定的線程來處理高并發(fā)(充分發(fā)揮機(jī)器的性能)。
WebFlux還支持服務(wù)器推送(SSE - >Server Send Event),我們來看個(gè)例子:
/**
?????*?Flux?:?返回0-n個(gè)元素
?????*?注:需要指定MediaType
?????*?@return
?????*/
@GetMapping(value?=?"/3",?produces?=?MediaType.TEXT_EVENT_STREAM_VALUE)
private?Flux?flux()? {
????Flux?result?=?Flux
????????.fromStream(IntStream.range(1,?5).mapToObj(i?->?{
????????????try?{
????????????????TimeUnit.SECONDS.sleep(1);
????????????}?catch?(InterruptedException?e)?{
????????????}
????????????return?"flux?data--"?+?i;
????????}));
????return?result;
}
效果就是每秒會(huì)給瀏覽器推送數(shù)據(jù):
服務(wù)器推送WebFlux我還沒寫完,這篇寫了WebFlux支持SpringMVC那套注解來開發(fā),下篇寫寫如何使用WebFlux另一種模式(Functional Endpoints)來開發(fā)以及一些常見的問題還需要補(bǔ)充一下~
兩年嘔心瀝血的文章:「面試題」「基礎(chǔ)」「進(jìn)階」這里全都有!
長按掃碼可關(guān)注獲取?
在看和分享對(duì)我非常重要!![]()
創(chuàng)作不易,各位的支持和認(rèn)可,就是我創(chuàng)作的最大動(dòng)力,我們下篇文章見!?求點(diǎn)贊?求關(guān)注???求分享??求留言?

點(diǎn)擊閱讀原文,關(guān)注我的GitHub
