<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          外行人都能看懂的WebFlux,錯(cuò)過了血虧

          共 3229字,需瀏覽 7分鐘

           ·

          2019-11-16 23:22

          前言

          只有光頭才能變強(qiáng)。

          文本已收錄至我的GitHub倉庫,歡迎Star:

          https://github.com/ZhongFuCheng3y/3y

          本文知識(shí)點(diǎn)架構(gòu):

          e7d26d52624bc7c050e43b18df26d26f.webp知識(shí)點(diǎn)架構(gòu)

          如果有關(guān)注我公眾號(hào)文章的同學(xué)就會(huì)發(fā)現(xiàn),最近我不定時(shí)轉(zhuǎn)發(fā)了一些比較好的WebFlux的文章,因?yàn)槲易罱趯W(xué)。

          我之前也說過,學(xué)習(xí)一項(xiàng)技術(shù)之前,先要了解為什么要學(xué)這項(xiàng)技術(shù)。其實(shí)這次學(xué)習(xí)WebFlux也沒有多大的原生動(dòng)力,主要是在我們組內(nèi)會(huì)輪流做一次技術(shù)分享,而我又不知道分享什么比較好…

          之前在初學(xué)大數(shù)據(jù)相關(guān)的知識(shí),但是這一塊的時(shí)間線會(huì)拉得比較長,感覺趕不及小組內(nèi)分享(而組內(nèi)的同學(xué)又大部分都懂大數(shù)據(jù),就只有我一個(gè)菜雞,淚目)。所以,想的是:“要不我學(xué)點(diǎn)新東西搞搞?”。于是就花了點(diǎn)時(shí)間學(xué)WebFlux啦~

          這篇文章主要講解什么是WebFlux,帶領(lǐng)大家入個(gè)門,希望對(duì)大家有所幫助(至少看完這篇文章,知道WebFlux是干嘛用的)

          一、什么是WebFlux?

          我們從Spring的官網(wǎng)拉下一點(diǎn)點(diǎn)就可以看到介紹WebFlux的地方了

          4fb1e693034501150e5f2b59afc6aa4b.webpWebFlux的簡介

          從官網(wǎng)的簡介中我們能得出什么樣的信息?

          • 我們程序員往往根據(jù)不同的應(yīng)用場景選擇不同的技術(shù),有的場景適合用于同步阻塞的,有的場景適合用于異步非阻塞的。而Spring5提供了一整套響應(yīng)式(非阻塞)的技術(shù)棧供我們使用(包括Web控制器、權(quán)限控制、數(shù)據(jù)訪問層等等)。

          • 而左側(cè)的圖則是技術(shù)棧的對(duì)比啦;

          • 響應(yīng)式一般用Netty或者Servlet 3.1的容器(因?yàn)橹С之惒椒亲枞?,而Servlet技術(shù)棧用的是Servlet容器

          • 在Web端,響應(yīng)式用的是WebFlux,Servlet用的是SpringMVC

          • …..

          總結(jié)起來,WebFlux只是響應(yīng)式編程中的一部分(在Web控制端),所以一般我們用它與SpringMVC來對(duì)比。

          二、如何理解響應(yīng)式編程?

          在上面提到了響應(yīng)式編程(Reactive Programming),而WebFlux只是響應(yīng)式編程的其中一個(gè)技術(shù)棧而已,所以我們先來探討一下什么是響應(yīng)式編程

          從維基百科里邊我們得到的定義:

          reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change

          響應(yīng)式編程(reactive programming)是一種基于數(shù)據(jù)流(data stream)和變化傳遞(propagation of change)的聲明式(declarative)的編程范式

          在維基百科上也舉了個(gè)小例子:

          97bde36afcbcf35eabc45604ef2bbd53.webp例子

          意思大概如下:

          • 在命令式編程(我們的日常編程模式)下,式子a=b+c,這就意味著a的值是由bc計(jì)算出來的。如果b或者c后續(xù)有變化,不會(huì)影響a的值

          • 在響應(yīng)式編程下,式子a:=b+c,這就意味著a的值是由bc計(jì)算出來的。但如果b或者c的值后續(xù)有變化,會(huì)影響a的值

          我認(rèn)為上面的例子已經(jīng)可以幫助我們理解變化傳遞(propagation of change)

          那數(shù)據(jù)流(data stream)和聲明式(declarative)怎么理解呢?那可以提一提我們的Stream流了。之前寫過Lambda表達(dá)式和Stream流的文章,大家可以先去看看:

          Lambda的語法是這樣的(Stream流的使用會(huì)涉及到很多Lambda表達(dá)式的東西,所以一般先學(xué)Lambda再學(xué)Stream流):

          d51b5da4d080793fc4a27058bdbd0cd3.webp語法

          Stream流的使用分為三個(gè)步驟(創(chuàng)建Stream流、執(zhí)行中間操作、執(zhí)行最終操作):

          3949afa143eb8eac9dcbd073922c3bb5.webp三步走

          執(zhí)行中間操作實(shí)際上就是給我們提供了很多的API去操作Stream流中的數(shù)據(jù)(求和/去重/過濾)等等

          aa6ac04ce7b2dfc123726a9ba0887115.webp中間操作 解釋

          說了這么多,怎么理解數(shù)據(jù)流和聲明式呢?其實(shí)是這樣的:

          • 本來數(shù)據(jù)是我們自行處理的,后來我們把要處理的數(shù)據(jù)抽象出來(變成了數(shù)據(jù)流),然后通過API去處理數(shù)據(jù)流中的數(shù)據(jù)(是聲明式的)

          比如下面的代碼;將數(shù)組中的數(shù)據(jù)變成數(shù)據(jù)流,通過顯式聲明調(diào)用.sum()來處理數(shù)據(jù)流中的數(shù)據(jù),得到最終的結(jié)果:

          public?static?void?main(String[]?args)?{
          ????int[]?nums?=?{?1,?2,?3?};
          ????int?sum2?=?IntStream.of(nums).parallel().sum();
          ????System.out.println("結(jié)果為:"?+?sum2);
          }

          如圖下所示:

          ce6c9b1e04ff6ef36904d53962efa97d.webp數(shù)據(jù)流與聲明式

          2.1 響應(yīng)式編程->異步非阻塞

          上面講了響應(yīng)式編程是什么:

          響應(yīng)式編程(reactive programming)是一種基于數(shù)據(jù)流(data stream)和變化傳遞(propagation of change)的聲明式(declarative)的編程范式

          也講解了數(shù)據(jù)流/變化傳遞/聲明式是什么意思,但說到響應(yīng)式編程就離不開異步非阻塞。

          從Spring官網(wǎng)介紹WebFlux的信息我們就可以發(fā)現(xiàn)asynchronous, nonblocking 這樣的字樣,因?yàn)?strong style="font-size:inherit;color:rgb(233,105,0);">響應(yīng)式編程它是異步的,也可以理解成變化傳遞它是異步執(zhí)行的。

          如下圖,合計(jì)的金額會(huì)受其他的金額影響(更新的過程是異步的):

          edb3aa45315862fbcd7ba55d0e1e7dd7.webp合計(jì)的錢會(huì)因?yàn)槠渌慕痤~影響

          我們的JDK8 Stream流是同步的,它就不適合用于響應(yīng)式編程(但基礎(chǔ)的用法是需要懂的,因?yàn)轫憫?yīng)式流編程都是操作嘛)

          而在JDK9 已經(jīng)支持響應(yīng)式流了,下面我們來看一下

          三、JDK9 Reactive

          響應(yīng)式流的規(guī)范早已經(jīng)被提出了:里面提到了:

          Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure ? ----->http://www.reactive-streams.org/

          翻譯再加點(diǎn)信息:

          響應(yīng)式流(Reactive Streams)通過定義一組實(shí)體,接口和互操作方法,給出了實(shí)現(xiàn)異步非阻塞背壓的標(biāo)準(zhǔn)。第三方遵循這個(gè)標(biāo)準(zhǔn)來實(shí)現(xiàn)具體的解決方案,常見的有Reactor,RxJava,Akka Streams,Ratpack等。

          規(guī)范里頭實(shí)際上就是定義了四個(gè)接口:

          b9720b025eaa91e29ad528739826917c.webp規(guī)范的四個(gè)接口

          Java 平臺(tái)直到 JDK 9才提供了對(duì)于Reactive的完整支持,JDK9也定義了上述提到的四個(gè)接口,在java.util.concurrent包上

          a639f899909664f37a7efe15447b389a.webpJava的響應(yīng)式流接口

          一個(gè)通用的流處理架構(gòu)一般會(huì)是這樣的(生產(chǎn)者產(chǎn)生數(shù)據(jù),對(duì)數(shù)據(jù)進(jìn)行中間處理,消費(fèi)者拿到數(shù)據(jù)消費(fèi)):

          46fab641de499629f0cecb60893655b9.webp流式處理架構(gòu)
          • 數(shù)據(jù)來源,一般稱為生產(chǎn)者(Producer)

          • 數(shù)據(jù)的目的地,一般稱為消費(fèi)者(Consumer)

          • 在處理時(shí),對(duì)數(shù)據(jù)執(zhí)行某些操作一個(gè)或多個(gè)處理階段。(Processor)

          到這里我們再看回響應(yīng)式流的接口,我們應(yīng)該就能懂了:

          • Publisher(發(fā)布者)相當(dāng)于生產(chǎn)者(Producer)

          • Subscriber(訂閱者)相當(dāng)于消費(fèi)者(Consumer)

          • Processor就是在發(fā)布者與訂閱者之間處理數(shù)據(jù)用的

          在響應(yīng)式流上提到了back pressure(背壓)這么一個(gè)概念,其實(shí)非常好理解。在響應(yīng)式流實(shí)現(xiàn)異步非阻塞是基于生產(chǎn)者和消費(fèi)者模式的,而生產(chǎn)者消費(fèi)者很容易出現(xiàn)的一個(gè)問題就是:生產(chǎn)者生產(chǎn)數(shù)據(jù)多了,就把消費(fèi)者給壓垮了。

          而背壓說白了就是:消費(fèi)者能告訴生產(chǎn)者自己需要多少量的數(shù)據(jù)。這里就是Subscription接口所做的事。

          下面我們來看看JDK9接口的方法,或許就更加能理解上面所說的話了:

          //?發(fā)布者(生產(chǎn)者)
          public?interface?Publisher<T>?{
          ????public?void?subscribe(Subscribersuper?T>?s);
          }
          //?訂閱者(消費(fèi)者)
          public?interface?Subscriber<T>?{
          ????public?void?onSubscribe(Subscription?s);
          ????public?void?onNext(T?t);
          ????public?void?onError(Throwable?t);
          ????public?void?onComplete();
          }
          //?用于發(fā)布者與訂閱者之間的通信(實(shí)現(xiàn)背壓:訂閱者能夠告訴生產(chǎn)者需要多少數(shù)據(jù))
          public?interface?Subscription?{
          ????public?void?request(long?n);
          ????public?void?cancel();
          }
          //?用于處理發(fā)布者?發(fā)布消息后,對(duì)消息進(jìn)行處理,再交由消費(fèi)者消費(fèi)
          public?interface?Processor<T,R>?extends?Subscriber<T>,?Publisher<R>?{
          }

          3.1 看個(gè)例子

          代碼中有大量的注釋,我就不多BB了,建議直接復(fù)制跑一下看看:

          class?MyProcessor?extends?SubmissionPublisher<String>
          ????????implements?Processor<Integer,?String>?
          {

          ????private?Subscription?subscription;

          ????@Override
          ????public?void?onSubscribe(Subscription?subscription)?{
          ????????//?保存訂閱關(guān)系,?需要用它來給發(fā)布者響應(yīng)
          ????????this.subscription?=?subscription;

          ????????//?請求一個(gè)數(shù)據(jù)
          ????????this.subscription.request(1);
          ????}

          ????@Override
          ????public?void?onNext(Integer?item)?{
          ????????//?接受到一個(gè)數(shù)據(jù),?處理
          ????????System.out.println("處理器接受到數(shù)據(jù):?"?+?item);

          ????????//?過濾掉小于0的,?然后發(fā)布出去
          ????????if?(item?>?0)?{
          ????????????this.submit("轉(zhuǎn)換后的數(shù)據(jù):"?+?item);
          ????????}

          ????????//?處理完調(diào)用request再請求一個(gè)數(shù)據(jù)
          ????????this.subscription.request(1);

          ????????//?或者?已經(jīng)達(dá)到了目標(biāo),?調(diào)用cancel告訴發(fā)布者不再接受數(shù)據(jù)了
          ????????//?this.subscription.cancel();
          ????}

          ????@Override
          ????public?void?onError(Throwable?throwable)?{
          ????????//?出現(xiàn)了異常(例如處理數(shù)據(jù)的時(shí)候產(chǎn)生了異常)
          ????????throwable.printStackTrace();

          ????????//?我們可以告訴發(fā)布者,?后面不接受數(shù)據(jù)了
          ????????this.subscription.cancel();
          ????}

          ????@Override
          ????public?void?onComplete()?{
          ????????//?全部數(shù)據(jù)處理完了(發(fā)布者關(guān)閉了)
          ????????System.out.println("處理器處理完了!");
          ????????//?關(guān)閉發(fā)布者
          ????????this.close();
          ????}

          }

          public?class?FlowDemo2?{

          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????//?1.?定義發(fā)布者,?發(fā)布的數(shù)據(jù)類型是?Integer
          ????????//?直接使用jdk自帶的SubmissionPublisher
          ????????SubmissionPublisher?publiser?=?new?SubmissionPublisher();

          ????????//?2.?定義處理器,?對(duì)數(shù)據(jù)進(jìn)行過濾,?并轉(zhuǎn)換為String類型
          ????????MyProcessor?processor?=?new?MyProcessor();

          ????????//?3.?發(fā)布者?和?處理器?建立訂閱關(guān)系
          ????????publiser.subscribe(processor);

          ????????//?4.?定義最終訂閱者,?消費(fèi)?String?類型數(shù)據(jù)
          ????????Subscriber?subscriber?=?new?Subscriber()?{

          ????????????private?Subscription?subscription;

          ????????????@Override
          ????????????public?void?onSubscribe(Subscription?subscription)?{
          ????????????????//?保存訂閱關(guān)系,?需要用它來給發(fā)布者響應(yīng)
          ????????????????this.subscription?=?subscription;

          ????????????????//?請求一個(gè)數(shù)據(jù)
          ????????????????this.subscription.request(1);
          ????????????}

          ????????????@Override
          ????????????public?void?onNext(String?item)?{
          ????????????????//?接受到一個(gè)數(shù)據(jù),?處理
          ????????????????System.out.println("接受到數(shù)據(jù):?"?+?item);

          ????????????????//?處理完調(diào)用request再請求一個(gè)數(shù)據(jù)
          ????????????????this.subscription.request(1);

          ????????????????//?或者?已經(jīng)達(dá)到了目標(biāo),?調(diào)用cancel告訴發(fā)布者不再接受數(shù)據(jù)了
          ????????????????//?this.subscription.cancel();
          ????????????}

          ????????????@Override
          ????????????public?void?onError(Throwable?throwable)?{
          ????????????????//?出現(xiàn)了異常(例如處理數(shù)據(jù)的時(shí)候產(chǎn)生了異常)
          ????????????????throwable.printStackTrace();

          ????????????????//?我們可以告訴發(fā)布者,?后面不接受數(shù)據(jù)了
          ????????????????this.subscription.cancel();
          ????????????}

          ????????????@Override
          ????????????public?void?onComplete()?{
          ????????????????//?全部數(shù)據(jù)處理完了(發(fā)布者關(guān)閉了)
          ????????????????System.out.println("處理完了!");
          ????????????}

          ????????};

          ????????//?5.?處理器?和?最終訂閱者?建立訂閱關(guān)系
          ????????processor.subscribe(subscriber);

          ????????//?6.?生產(chǎn)數(shù)據(jù),?并發(fā)布
          ????????publiser.submit(-111);
          ????????publiser.submit(111);

          ????????//?7.?結(jié)束后?關(guān)閉發(fā)布者
          ????????//?正式環(huán)境?應(yīng)該放?finally?或者使用?try-resouce?確保關(guān)閉
          ????????publiser.close();

          ????????//?主線程延遲停止,?否則數(shù)據(jù)沒有消費(fèi)就退出
          ????????Thread.currentThread().join(1000);
          ????}

          }

          輸出的結(jié)果如下:

          2de7f8c2ef29d9d7fe7f86127147b3a9.webp輸出的結(jié)果

          流程實(shí)際上非常簡單的:

          b02d4f293a20332f118f4a3f32c094e6.webp流程

          參考資料:

          • https://yanbin.blog/java-9-talk-reactive-stream/#more-8877

          • https://blog.csdn.net/wudaoshihun/article/details/83070086

          • http://www.spring4all.com/article/6826

          • https://www.cnblogs.com/IcanFixIt/p/7245377.html

          Java 8 的 Stream 主要關(guān)注在流的過濾,映射,合并,而 ?Reactive Stream 更進(jìn)一層,側(cè)重的是流的產(chǎn)生與消費(fèi),即流在生產(chǎn)與消費(fèi)者之間的協(xié)調(diào)

          說白了就是:響應(yīng)式流是異步非阻塞+流量控制的(可以告訴生產(chǎn)者自己需要多少的量/取消訂閱關(guān)系)

          展望響應(yīng)式編程的場景應(yīng)用:

          比如一個(gè)日志監(jiān)控系統(tǒng),我們的前端頁面將不再需要通過“命令式”的輪詢的方式不斷向服務(wù)器請求數(shù)據(jù)然后進(jìn)行更新,而是在建立好通道之后,數(shù)據(jù)流從系統(tǒng)源源不斷流向頁面,從而展現(xiàn)實(shí)時(shí)的指標(biāo)變化曲線;

          再比如一個(gè)社交平臺(tái),朋友的動(dòng)態(tài)、點(diǎn)贊和留言不是手動(dòng)刷出來的,而是當(dāng)后臺(tái)數(shù)據(jù)變化的時(shí)候自動(dòng)體現(xiàn)到界面上的。

          四、入門WebFlux

          扯了一大堆,終于回到WebFlux了。經(jīng)過上面的基礎(chǔ),我們現(xiàn)在已經(jīng)能夠得出一些結(jié)論的了:

          • WebFlux是Spring推出響應(yīng)式編程的一部分(web端)

          • 響應(yīng)式編程是異步非阻塞的(是一種基于數(shù)據(jù)流(data stream)和變化傳遞(propagation of change)的聲明式(declarative)的編程范式)

          我們再回來看官網(wǎng)的圖:

          1a877bf5ecd5e690bafc204ef42fa85b.webpmvc or webflux

          4.1 簡單體驗(yàn)WebFlux

          Spring官方為了讓我們更加快速/平滑到WebFlux上,之前SpringMVC那套都是支持的。也就是說:我們可以像使用SpringMVC一樣使用著WebFlux。

          b31615b4953c1b0bf877ae72217e4d2c.webp支持SpringMVC那套

          WebFlux使用的響應(yīng)式流并不是用JDK9平臺(tái)的,而是一個(gè)叫做Reactor響應(yīng)式流庫。所以,入門WebFlux其實(shí)更多是了解怎么使用Reactor的API,下面我們來看看~

          Reactor是一個(gè)響應(yīng)式流,它也有對(duì)應(yīng)的發(fā)布者(Publisher ),Reactor的發(fā)布者用兩個(gè)類來表示:

          • Mono(返回0或1個(gè)元素)

          • Flux(返回0-n個(gè)元素)

          而消費(fèi)者則是Spring框架幫我們去完成

          下面我們來看一個(gè)簡單的例子(基于WebFlux環(huán)境構(gòu)建):

          //?阻塞5秒鐘
          private?String?createStr()?{
          ????try?{
          ????????TimeUnit.SECONDS.sleep(5);
          ????}?catch?(InterruptedException?e)?{
          ????}
          ????return?"some?string";
          }

          //?普通的SpringMVC方法
          @GetMapping("/1")
          private?String?get1()?{
          ????log.info("get1?start");
          ????String?result?=?createStr();
          ????log.info("get1?end.");
          ????return?result;
          }

          //?WebFlux(返回的是Mono)
          @GetMapping("/2")
          private?Mono?get2()?{
          ????log.info("get2?start");
          ????Mono?result?=?Mono.fromSupplier(()?->?createStr());
          ????log.info("get2?end.");
          ????return?result;
          }

          首先,值得說明的是,我們構(gòu)建WebFlux環(huán)境啟動(dòng)時(shí),應(yīng)用服務(wù)器默認(rèn)是Netty的:

          328a85f3100a4b30b0b5bf3f4fa4e2aa.webp基于Netty

          我們分別來訪問一下SpringMVC的接口和WebFlux的接口,看一下有什么區(qū)別:

          SpringMVC:

          cc31a368b2363e3decfd09eb088116d9.webpSpringMVC

          WebFlux:

          ec77d80be6e3b41a86dcf3469e200b3d.webpWebFlux

          從調(diào)用者(瀏覽器)的角度而言,是感知不到有什么變化的,因?yàn)槎际堑玫却?s才返回?cái)?shù)據(jù)。但是,從服務(wù)端的日志我們可以看出,WebFlux是直接返回Mono對(duì)象的(而不是像SpringMVC一直同步阻塞5s,線程才返回)。

          這正是WebFlux的好處:能夠以固定的線程來處理高并發(fā)(充分發(fā)揮機(jī)器的性能)。

          WebFlux還支持服務(wù)器推送(SSE - >Server Send Event),我們來看個(gè)例子:

          /**
          ?????*?Flux?:?返回0-n個(gè)元素
          ?????*?注:需要指定MediaType
          ?????*?@return
          ?????*/

          @GetMapping(value?=?"/3",?produces?=?MediaType.TEXT_EVENT_STREAM_VALUE)
          private?Flux?flux()?{
          ????Flux?result?=?Flux
          ????????.fromStream(IntStream.range(1,?5).mapToObj(i?->?{
          ????????????try?{
          ????????????????TimeUnit.SECONDS.sleep(1);
          ????????????}?catch?(InterruptedException?e)?{
          ????????????}
          ????????????return?"flux?data--"?+?i;
          ????????}));
          ????return?result;
          }

          效果就是每秒會(huì)給瀏覽器推送數(shù)據(jù):

          1dd3104a10c9b039e20830846b6cdb5a.webp服務(wù)器推送

          WebFlux我還沒寫完,這篇寫了WebFlux支持SpringMVC那套注解來開發(fā),下篇寫寫如何使用WebFlux另一種模式(Functional Endpoints)來開發(fā)以及一些常見的問題還需要補(bǔ)充一下~


          兩年嘔心瀝血的文章「面試題」「基礎(chǔ)」「進(jìn)階」這里全都有!

          300多篇原創(chuàng)技術(shù)文章海量視頻資源精美腦圖面試題

          長按掃碼可關(guān)注獲取?

          在看和分享對(duì)我非常重要!febdad5ad9f65501b6f485dad0fc554a.webp

          創(chuàng)作不易,各位的支持和認(rèn)可,就是我創(chuàng)作的最大動(dòng)力,我們下篇文章見!?求點(diǎn)贊?求關(guān)注???求分享??求留言?

          6c144579482706c8715e9afc0a0bc7ad.webp

          點(diǎn)擊閱讀原文,關(guān)注我的GitHub

          瀏覽 44
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  超碰在线人妻少妇 | 一二三久久 | BB在线视频网站 | 台湾一区二区三区在线 | 偷拍福利视频 |