<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Project Reactor 響應式編程

          共 12519字,需瀏覽 26分鐘

           ·

          2021-06-27 23:44

          一. 什么是響應式編程?

          在計算中,響應式編程或反應式編程是一種面向數(shù)據(jù)流和變化傳播的聲明式編程范式。這意味著可以在編程語言中很方便地表達靜態(tài)或動態(tài)的數(shù)據(jù)流,而相關(guān)的計算模型會自動將變化的值通過數(shù)據(jù)流進行傳播。

          上面一段話來自維基百科。

          響應式編程顧名思義就是在于響應二字,我們需要在某個事件發(fā)生時做出響應。

          我們現(xiàn)實生活就是對響應式很好的解釋,我們?nèi)祟惖呐e動大多都是基于事件驅(qū)動模式,當有人呼喊你的名字,你會根據(jù)這個事件來判斷要不要進行應答,這個過程其實就是產(chǎn)生事件,然后我們作為消費者對事件進行處理,而我們的處理結(jié)果也會繼續(xù)向下傳遞。

          在響應式編程中,通常是采用異步回調(diào)的方式,回調(diào)方法的調(diào)用和控制則會由響應式框架來完成,對于應用開發(fā)來說只需要關(guān)注回調(diào)方法的實現(xiàn)就可以了。

          這里提一個著名的設計原則:好萊塢原則(Hollywood principle)

          Don't call us, we will call you.

          演員提交簡歷之后,回家等著就好,演藝公司會主動打電話給你。

          二. Project Reactor介紹

          Java中最早的Reactor庫RxJava借鑒于.Net的Reactor Extensions,后來Jdk在Java9提供了標準化的響應式庫實現(xiàn)java.util.concurrent.Flow,再后來,Project Reactor作為第四代響應式編程框架出現(xiàn),它是一個完全非阻塞響應式編程的基石,直接集成了Java函數(shù)式API,特別是CompletableFuture,Stream和Duration。Reactor Netty實現(xiàn)了非阻塞跨進程通信,提升了服務間通信效率。

          我們在平常開發(fā)中,異步編程無非是使用JUC包下的工具類或者一些Java同步語義。

          ?阻塞等待:如 Future.get()?不安全的數(shù)據(jù)訪問:如 ReentrantLock.lock()?異常冒泡:如 try…catch…finally?同步阻塞:如 synchronized{ }?Wrapper分配(GC 壓力):如 new Wrapper(event)

          或者自定義線程池,但也會遇到諸如以下的問題。

          ?Callable 分配 -- 可能導致 GC 壓力。?同步過程強制每個線程執(zhí)行停-檢查操作。?消息的消費可能比生產(chǎn)慢。?使用線程池(ThreadPool)將任務傳遞給目標線程 -- 通過 FutureTask 方式肯定會產(chǎn)生 GC 壓力。?阻塞直至IO回調(diào)。

          上面等等問題都會造成的系統(tǒng)性能瓶頸或者安全問題,在Future.get時我們無法避免阻塞等待,最差情況下程序運行其實還是同步的,使用Reactor不但可以很有效的解決上述問題,還能讓我們寫出更加簡潔明了的代碼。

          三. Reactor核心概念

          代碼: https://github.com/CasterWx/reactor-ppt

          Flux

          Flux 表示的是包含 0 到 N 個元素的異步序列。在該序列中可以包含三種不同類型的消息通知:正常的包含元素的消息、序列結(jié)束的消息和序列出錯的消息。當消息通知產(chǎn)生時,訂閱者中對應的方法 onNext(), onComplete()和 onError()會被調(diào)用。

          1. just()

          可以指定序列中包含的全部元素。創(chuàng)建出來的 Flux 序列在發(fā)布這些元素之后會自動結(jié)束。

          Flux.just("hello", "world")
          .doOnNext((i) -> {
          System.out.println("[doOnNext] " + i);
          })
          .doOnComplete(() -> System.out.println("[doOnComplete]"))
          .subscribe(i -> System.out.println("[subscribe] " + i));

          // 執(zhí)行結(jié)果
          [doOnNext] hello
          [subscribe] hello
          [doOnNext] world
          [subscribe] world
          [doOnComplete]

          2. fromArray(),fromIterable()和 fromStream()

          可以從一個數(shù)組、Iterable 對象或 Stream 對象中創(chuàng)建 Flux 對象。

          List<String> arr = Arrays.asList("flux", "mono", "reactor", "core");
          Flux.fromIterable(arr)
          .doOnNext((i) -> {
          System.out.println("[doOnNext] " + i);
          })
          .subscribe((i) -> {
          System.out.println("[subscribe] " + i);
          });
          //執(zhí)行結(jié)果
          [doOnNext] flux
          [subscribe] flux
          [doOnNext] mono
          [subscribe] mono
          [doOnNext] reactor
          [subscribe] reactor
          [doOnNext] core
          [subscribe] core

          3. empty()

          創(chuàng)建一個不包含任何元素,只發(fā)布結(jié)束消息的序列。

           Flux.empty()
          .doOnNext(i -> {
          System.out.println("[doOnNext] " + i);
          }).doOnComplete(() -> {
          System.out.println("[DoOnComplete] ");
          }).subscribe(i -> {
          System.out.println("[subscribe] " + i);
          });
          //執(zhí)行結(jié)果
          [DoOnComplete]

          4. error(Throwable error)

          創(chuàng)建一個只包含錯誤消息的序列。

          try {
          int []arr = new int[5];
          arr[10] = 2;
          } catch (Exception e) {
          Flux.error(e).subscribe(i -> {
          System.out.println("error subscribe");
          });
          }
          //執(zhí)行結(jié)果

          5. never()

          創(chuàng)建一個不包含任何消息通知的序列。

          Flux.never()
          .doOnNext(i -> {
          System.out.println("doOnNext " + i);
          }).doOnComplete(() -> {
          System.out.println("doOnComplete");
          }).subscribe((i) -> {
          System.out.println("subscribe " + i);
          });
          //執(zhí)行結(jié)果

          6. range(int start, int count)

          創(chuàng)建包含從 start 起始的 count 個數(shù)量的 Integer 對象的序列。

          Flux.range(5, 10)
          .doOnNext(i -> {
          System.out.println("doOnNext " + i);
          }).doOnComplete(() -> {
          System.out.println("doOnComplete");
          }).subscribe((i) -> {
          System.out.println("subscribe " + i);
          });
          //執(zhí)行結(jié)果
          doOnNext 5
          subscribe 5
          doOnNext 6
          subscribe 6
          doOnNext 7
          subscribe 7
          doOnNext 8
          subscribe 8
          doOnNext 9
          subscribe 9
          doOnNext 10
          subscribe 10
          doOnNext 11
          subscribe 11
          doOnNext 12
          subscribe 12
          doOnNext 13
          subscribe 13
          doOnNext 14
          subscribe 14
          doOnComplete

          7. interval(Duration period)和 interval(Duration delay, Duration period)

          創(chuàng)建一個包含了從 0 開始遞增的 Long 對象的序列。其中包含的元素按照指定的間隔來發(fā)布。除了間隔時間之外,還可以指定起始元素發(fā)布之前的延遲時間。

          Flux.interval(Duration.ofSeconds(4), Duration.ofSeconds(2))
          .doOnNext(i -> {
          System.out.println("doOnNext " + i);
          }).doOnComplete(() -> {
          System.out.println("doOnComplete " + new Date());
          }).subscribe((i) -> {
          System.out.println("subscribe " + i + ", date: " + new Date());
          });
          try {
          Thread.sleep(10000);
          } catch (InterruptedException e) {
          e.printStackTrace();
          }
          //執(zhí)行結(jié)果
          doOnNext 0
          subscribe 0, date: Fri Jun 25 10:17:56 CST 2021
          doOnNext 1
          subscribe 1, date: Fri Jun 25 10:17:58 CST 2021
          doOnNext 2
          subscribe 2, date: Fri Jun 25 10:18:00 CST 2021
          doOnNext 3
          subscribe 3, date: Fri Jun 25 10:18:02 CST 2021

          上面實例為什么沒有輸出doOnComplete, 從第四秒開始,每兩秒生產(chǎn)一個元素,等到最后complete時已經(jīng)到了sleep的十秒時間,主線程main已經(jīng)推出。

          8. intervalMillis(long period)和 intervalMillis(long delay, long period)

          與 interval()方法的作用相同,只不過該方法通過毫秒數(shù)來指定時間間隔和延遲時間。

          Mono

          Mono 表示的是包含 0 或者 1 個元素的異步序列。該序列中同樣可以包含與 Flux 相同的三種類型的消息通知。

          1. fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier()

          分別從 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中創(chuàng)建 Mono。

           Mono.fromCallable(() -> {
          System.out.println("begin callable");
          return "Hello";
          })
          .subscribeOn(Schedulers.elastic())
          .doOnNext((i) -> System.out.println("doOnNext " + i + ", thread :" + Thread.currentThread().getName()))
          .subscribe(System.out::println);
          Thread.sleep(10000);
          //執(zhí)行結(jié)果
          begin callable
          doOnNext Hello, thread :elastic-2
          Hello
          Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
          System.out.println("begin");
          return "hello";
          }))
          .subscribeOn(Schedulers.elastic())
          .doOnNext((i) -> System.out.println("doOnNext " + i + ", thread :" + Thread.currentThread().getName()))
          .subscribe(System.out::println);
          Thread.sleep(10000);
          //執(zhí)行結(jié)果
          begin
          doOnNext hello, thread :elastic-2
          hello

          2. delay(Duration duration)和 delayMillis(long duration)

          創(chuàng)建一個 Mono 序列,在指定的延遲時間之后,產(chǎn)生數(shù)字 0 作為唯一值。

          Mono.delay(Duration.ofSeconds(1)).subscribe(System.out::println);
          Thread.sleep(3000);
          //執(zhí)行結(jié)果, 延遲一秒后打印
          0

          3. ignoreElements(Publisher source)

          創(chuàng)建一個 Mono 序列,忽略作為源的 Publisher 中的所有元素,只產(chǎn)生結(jié)束消息。

          Mono.ignoreElements((i) -> {
          System.out.println("ignoreElements");
          })
          .doOnNext((i) -> System.out.println("doOnNext " + i))
          .subscribe(System.out::println);
          //執(zhí)行結(jié)果
          ignoreElements

          4. justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data)

          從一個 Optional 對象或可能為 null 的對象中創(chuàng)建 Mono。只有 Optional 對象中包含值或?qū)ο蟛粸?null 時,Mono 序列才產(chǎn)生對應的元素。

          Optional<Integer> optional = Optional.empty();
          Mono.justOrEmpty(optional)
          .doOnNext((i) -> System.out.println("doOnNext " + i))
          .subscribe(System.out::println);

          System.out.println("========");

          optional = Optional.of(100);
          Mono.justOrEmpty(optional)
          .doOnNext((i) -> System.out.println("doOnNext " + i))
          .subscribe(System.out::println);
          //執(zhí)行結(jié)果
          ========
          doOnNext 100
          100

          操作符

          1. buffer 和 bufferTimeout

          這兩個操作符的作用是把當前流中的元素收集到集合中,并把集合對象作為流中的新元素。在進行收集時可以指定不同的條件:所包含的元素的最大數(shù)量或收集的時間間隔。方法 buffer()僅使用一個條件,而 bufferTimeout()可以同時指定兩個條件。指定時間間隔時可以使用 Duration 對象或毫秒數(shù),即使用 bufferMillis()或 bufferTimeoutMillis()兩個方法。

          除了元素數(shù)量和時間間隔之外,還可以通過 bufferUntil 和 bufferWhile 操作符來進行收集。這兩個操作符的參數(shù)是表示每個集合中的元素所要滿足的條件的 Predicate 對象。bufferUntil 會一直收集直到 Predicate 返回為 true。使得 Predicate 返回 true 的那個元素可以選擇添加到當前集合或下一個集合中;bufferWhile 則只有當 Predicate 返回 true 時才會收集。一旦值為 false,會立即開始下一次收集。

          Flux.range(1, 100).buffer(20).subscribe(System.out::println);

          //執(zhí)行結(jié)果
          [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
          [21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
          [41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
          [61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
          [81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100]

          2. filter

          對流中包含的元素進行過濾,只留下滿足 Predicate 指定條件的元素。

          Flux.range(1, 10)
          .filter(i -> i%2==0)
          .doOnNext(i -> {
          System.out.println("[doOnNext] " + i);
          })
          .subscribe(i -> {
          System.out.println("subscribe " + i);
          });
          //執(zhí)行結(jié)果
          [doOnNext] 2
          subscribe 2
          [doOnNext] 4
          subscribe 4
          [doOnNext] 6
          subscribe 6
          [doOnNext] 8
          subscribe 8
          [doOnNext] 10
          subscribe 10

          3. window

          window 操作符的作用類似于 buffer,所不同的是 window 操作符是把當前流中的元素收集到另外的 Flux 序列中,因此返回值類型是 Flux

          Flux.range(1, 15).window(5)
          .doOnNext((flux -> {}))
          .subscribe(flux -> {
          flux.doOnNext((item) -> {
          System.out.println("[window] flux: " + item);
          })
          .doOnComplete(() -> System.out.println("flux item complete"))
          .subscribe();
          });
          // 執(zhí)行結(jié)果
          [window] flux: 1
          [window] flux: 2
          [window] flux: 3
          [window] flux: 4
          [window] flux: 5
          flux item complete
          [window] flux: 6
          [window] flux: 7
          [window] flux: 8
          [window] flux: 9
          [window] flux: 10
          flux item complete
          [window] flux: 11
          [window] flux: 12
          [window] flux: 13
          [window] flux: 14
          [window] flux: 15
          flux item complete

          4. zipWith

          zipWith 操作符把當前流中的元素與另外一個流中的元素按照一對一的方式進行合并。在合并時可以不做任何處理,由此得到的是一個元素類型為 Tuple2 的流;也可以通過一個 BiFunction 函數(shù)對合并的元素進行處理,所得到的流的元素類型為該函數(shù)的返回值。

          Flux.just("Hello", "Project")
          .zipWith(Flux.just("World", "Reactor"))
          .subscribe(System.out::println);

          System.out.println("======");

          Flux.just("Hello", "Project")
          .zipWith(Flux.just("World", "Reactor"), (s1, s2) -> String.format("%s!%s!", s1, s2))
          .subscribe(System.out::println);
          // 執(zhí)行結(jié)果
          Hello,World
          Project,Reactor
          ======
          Hello!World!
          Project!Reactor!

          5. take

          take 系列操作符用來從當前流中提取元素。提取的方式可以有很多種。

          take(long n),take(Duration timespan)和 takeMillis(long timespan):按照指定的數(shù)量或時間間隔來提取。

          Flux.range(1, 10).take(2).subscribe(System.out::println);
          // 執(zhí)行結(jié)果
          1
          2

          1.takeLast(long n):提取流中的最后 N 個元素。

          Flux.range(1, 10).takeLast(2).subscribe(System.out::println);
          // 執(zhí)行結(jié)果
          9
          10

          1.takeUntil(Predicate<? super T> predicate):提取元素直到 Predicate 返回 true。

          Flux.range(1, 10).takeUntil(i -> i == 6).subscribe(System.out::println);
          // 執(zhí)行結(jié)果
          1
          2
          3
          4
          5
          6

          1.takeWhile(Predicate<? super T> continuePredicate):當 Predicate 返回 true 時才進行提取。

          Flux.range(1, 10).takeWhile(i -> i < 5).subscribe(System.out::println);
          // 執(zhí)行結(jié)果
          1
          2
          3
          4

          1.takeUntilOther(Publisher<?> other):提取元素直到另外一個流開始產(chǎn)生元素。

          Flux.range(1, 5).takeUntilOther((i) -> {
          try {
          Thread.sleep(1000);
          } catch (InterruptedException e) {
          e.printStackTrace();
          }
          }).subscribe(System.out::println);
          // 執(zhí)行結(jié)果,暫停1000ms后開始輸出
          1
          2
          3
          4
          5

          6. reduce 和 reduceWith

          reduce 和 reduceWith 操作符對流中包含的所有元素進行累積操作,得到一個包含計算結(jié)果的 Mono 序列。累積操作是通過一個 BiFunction 來表示的。在操作時可以指定一個初始值。如果沒有初始值,則序列的第一個元素作為初始值。

          Flux.range(1, 10)
          .reduce((x, y) -> {
          System.out.println("x:" + x + ", y:" + y);
          return x+y;
          })
          .subscribe(System.out::println);
          // 執(zhí)行結(jié)果
          x:1, y:2
          x:3, y:3
          x:6, y:4
          x:10, y:5
          x:15, y:6
          x:21, y:7
          x:28, y:8
          x:36, y:9
          x:45, y:10
          55
          Flux.range(1, 10)
          .reduceWith(() -> 100, (x, y) -> {
          System.out.println("x:" + x + ", y:" + y);
          return x+y;
          })
          .subscribe(System.out::println);
          // 執(zhí)行結(jié)果
          x:100, y:1
          x:101, y:2
          x:103, y:3
          x:106, y:4
          x:110, y:5
          x:115, y:6
          x:121, y:7
          x:128, y:8
          x:136, y:9
          x:145, y:10
          155

          7. merge 和 mergeSequential

          merge 和 mergeSequential 操作符用來把多個流合并成一個 Flux 序列。不同之處在于 merge 按照所有流中元素的實際產(chǎn)生順序來合并,而 mergeSequential 則按照所有流被訂閱的順序,以流為單位進行合并。

          Flux.merge(Flux.interval(
          Duration.of(0, ChronoUnit.MILLIS),
          Duration.of(100, ChronoUnit.MILLIS)).take(2),
          Flux.interval(
          Duration.of(50, ChronoUnit.MILLIS),
          Duration.of(100, ChronoUnit.MILLIS)).take(2))
          .toStream()
          .forEach(System.out::println);
          System.out.println("==============");
          Flux.mergeSequential(Flux.interval(
          Duration.of(0, ChronoUnit.MILLIS),
          Duration.of(100, ChronoUnit.MILLIS)).take(2),
          Flux.interval(
          Duration.of(50, ChronoUnit.MILLIS),
          Duration.of(100, ChronoUnit.MILLIS)).take(2))
          .toStream()
          .forEach(System.out::println);
          // 執(zhí)行結(jié)果
          0
          0
          1
          1
          ==============
          0
          1
          0
          1

          8. flatMap 和 flatMapSequential

          flatMap 和 flatMapSequential 操作符把流中的每個元素轉(zhuǎn)換成一個流,再把所有流中的元素進行合并。flatMapSequential 和 flatMap 之間的區(qū)別與 mergeSequential 和 merge 之間的區(qū)別是一樣的。

          Flux.just(1, 2)
          .flatMap(x -> Flux.interval(Duration.of(x * 10, ChronoUnit.MILLIS), Duration.of(10, ChronoUnit.MILLIS)).take(x))
          .toStream()
          .forEach(System.out::println);
          // 執(zhí)行結(jié)果
          0
          0
          1

          9. concatMap 和 combineLatest

          concatMap 操作符的作用也是把流中的每個元素轉(zhuǎn)換成一個流,再把所有流進行合并。與 flatMap 不同的是,concatMap 會根據(jù)原始流中的元素順序依次把轉(zhuǎn)換之后的流進行合并;與 flatMapSequential 不同的是,concatMap 對轉(zhuǎn)換之后的流的訂閱是動態(tài)進行的,而 flatMapSequential 在合并之前就已經(jīng)訂閱了所有的流。

          Flux.just(5, 10)
          .concatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))
          .toStream()
          .forEach(System.out::println);

          Flux.combineLatest(
          Arrays::toString,
          Flux.intervalMillis(100).take(5),
          Flux.intervalMillis(50, 100).take(5)
          ).toStream().forEach(System.out::println);

          四. 結(jié)束

          上文已經(jīng)簡單介紹了Reactor的兩個核心概念Flux和Mono,以及一些常用操作符的使用,剛開始使用響應式編程范式對于部分開發(fā)人員來說可能極度困難,但熟能生巧,長期使用讓思維方式轉(zhuǎn)變才能領(lǐng)會到響應式編程的優(yōu)點。


          瀏覽 65
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  免费的A片视频 | 日本香港台湾三级无码 | 欧美大吊操逼 | 成人无码小视频 | 男女激情操逼一区福利网站 |