<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink 源碼深度解析-Async IO的實(shí)現(xiàn)

          共 13018字,需瀏覽 27分鐘

           ·

          2022-02-26 21:32

          點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”
          回復(fù)"面試"獲取更多驚喜
          Hi,我是王知無,一個(gè)大數(shù)據(jù)領(lǐng)域的原創(chuàng)作者。?
          放心關(guān)注我,獲取更多行業(yè)的一手消息。

          Async I/O的使用方式

          在Flink中使用Async I/O的話,需要有一個(gè)支持異步請求的客戶端,或者以多線程異步的方式來將同步操作轉(zhuǎn)化為異步操作調(diào)用;

          以官方文檔給出的說明為例:

          //?This?example?implements?the?asynchronous?request?and?callback?with?Futures?that?have?the
          //?interface?of?Java?8's?futures?(which?is?the?same?one?followed?by?Flink's?Future)

          /**
          ?*?An?implementation?of?the?'AsyncFunction'?that?sends?requests?and?sets?the?callback.
          ?*/
          class?AsyncDatabaseRequest?extends?RichAsyncFunction>?{

          ????/**?The?database?specific?client?that?can?issue?concurrent?requests?with?callbacks?*/
          ????private?transient?DatabaseClient?client;

          ????@Override
          ????public?void?open(Configuration?parameters)?throws?Exception?{
          ????????client?=?new?DatabaseClient(host,?post,?credentials);
          ????}

          ????@Override
          ????public?void?close()?throws?Exception?{
          ????????client.close();
          ????}

          ????@Override
          ????public?void?asyncInvoke(String?key,?final?ResultFuture>?resultFuture)?throws?Exception?{
          ????????//?issue?the?asynchronous?request,?receive?a?future?for?result
          ????????//?發(fā)起異步請求,返回結(jié)果是一個(gè)Future
          ????????final?Future?result?=?client.query(key);

          ????????//?set?the?callback?to?be?executed?once?the?request?by?the?client?is?complete
          ????????//?the?callback?simply?forwards?the?result?to?the?result?future
          ????????//?請求完成時(shí)的回調(diào),將結(jié)果交給?ResultFuture
          ????????CompletableFuture.supplyAsync(new?Supplier()?{
          ????????????@Override
          ????????????public?String?get()?{
          ????????????????try?{
          ????????????????????return?result.get();
          ????????????????}?catch?(InterruptedException?|?ExecutionException?e)?{
          ????????????????????//?Normally?handled?explicitly.
          ????????????????????return?null;
          ????????????????}
          ????????????}
          ????????}).thenAccept(?(String?dbResult)?->?{
          ????????????resultFuture.complete(Collections.singleton(new?Tuple2<>(key,?dbResult)));
          ????????});
          ????}
          }

          //?create?the?original?stream
          DataStream?stream?=?...;

          //?apply?the?async?I/O?transformation
          //?應(yīng)用async?I/O轉(zhuǎn)換,設(shè)置等待模式、超時(shí)時(shí)間、以及進(jìn)行中的異步請求的最大數(shù)量
          DataStream>?resultStream?=
          ????AsyncDataStream.unorderedWait(stream,?new?AsyncDatabaseRequest(),?1000,?TimeUnit.MILLISECONDS,?100);

          AsyncDataStream提供了兩種調(diào)用方法,分別是orderedWait和unorderedWait,這分別對應(yīng)了有序和無序兩種輸出模式。

          之所以會提供兩種輸出模式,是因?yàn)楫惒秸埱蟮耐瓿蓵r(shí)間是不確定的,先發(fā)出的請求的完成時(shí)間可能會晚于后發(fā)出的請求。

          • 在“有序”的輸出模式下,所有計(jì)算結(jié)果的提交完全和消息的到達(dá)順序一致;
          • 而在“無序”的輸出模式下,計(jì)算結(jié)果的提交則是和請求的完成順序相關(guān)的,先處理完成的請求的計(jì)算結(jié)果會先提交。

          值得注意的是,在使用“事件時(shí)間”的情況下,“無序”輸出模式仍然可以保證watermark的正常處理,即在兩個(gè)watermark之間的消息的異步請求結(jié)果可能是異步提交的,但在watermark之后的消息不能先于該watermark之前的消息提交。

          由于異步請求的完成時(shí)間不確定,需要設(shè)置請求的超時(shí)時(shí)間,并配置同時(shí)進(jìn)行中的異步請求的最大數(shù)量。

          Async I/O的實(shí)現(xiàn)

          AsyncDataStream在運(yùn)行時(shí)被轉(zhuǎn)換為AsyncWaitOperator算子,它是AbstractUdfStreamOperator的子類。其AsyncWaitOperator的基本實(shí)現(xiàn)原理如下:

          基本原理

          AsyncWaitOperator算子相比于其它算子的最大不同在于,它的輸入和輸出并不是同步的。

          因此,在AsyncWaitOperator內(nèi)部采用了一種“生產(chǎn)者-消費(fèi)者”模型,基于一個(gè)隊(duì)列解耦異步計(jì)算和計(jì)算結(jié)果的提交。StreamElementQueue提供了一種隊(duì)列的抽象,一個(gè)“消費(fèi)者”線程Emitter從中取出已完成的計(jì)算結(jié)果,并提交給下游算子,而異步請求則充當(dāng)了隊(duì)列“生產(chǎn)者”的角色;

          如圖所示,AsyncWaitOperator主要由兩部分組成:StreamElementQueue和Emitter。

          StreamElementQueue是一個(gè)Promise隊(duì)列,所謂Promise是一種異步抽象表示將來會有一個(gè)值,這個(gè)隊(duì)列是未完成的Promise隊(duì)列,也就是進(jìn)行中的請求隊(duì)列。Emitter是一個(gè)單獨(dú)的線程,負(fù)責(zé)發(fā)送消息(收到的異步回復(fù))給下游。

          圖中E5表示進(jìn)入該算子的第五個(gè)元素(”Element-5”),在執(zhí)行過程中首先會將其包裝成一個(gè)“Promise” P5,然后將P5放入隊(duì)列。最后調(diào)用AsyncFunction的asyncInvoke方法,該方法會向外部服務(wù)發(fā)起一個(gè)異步的請求,并注冊回調(diào)。

          該回調(diào)會在異步請求成功返回時(shí)調(diào)用AsyncCollector.collect方法將返回的結(jié)果交給框架處理。

          實(shí)際上AsyncCollector是一個(gè)Promise,也就是 P5,在調(diào)用collect的時(shí)候會標(biāo)記Promise為完成狀態(tài),并通知Emitter線程有完成的消息可以發(fā)送了。Emitter就會從隊(duì)列中拉取完成的Promise,并從Promise中取出消息發(fā)送給下游。

          public?class?AsyncWaitOperator
          ??????extends?AbstractUdfStreamOperator>
          ??????implements?OneInputStreamOperator,?OperatorActions?{
          ??????????
          ????/**?Queue?to?store?the?currently?in-flight?stream?elements?into.?*/
          ????private?transient?StreamElementQueue?queue;???????????????//?存儲帶有異步返回值的請求隊(duì)列
          ????
          ????/**?Pending?stream?element?which?could?not?yet?added?to?the?queue.?*/
          ????private?transient?StreamElementQueueEntry?pendingStreamElementQueueEntry;
          ????
          ????private?transient?ExecutorService?executor;
          ????
          ????/**?Emitter?for?the?completed?stream?element?queue?entries.?*/
          ????private?transient?Emitter?emitter;??????????????????//?異步返回后的消費(fèi)線程
          ????
          ????/**?Thread?running?the?emitter.?*/
          ????private?transient?Thread?emitterThread;
          ????
          ????@Override
          ????public?void?setup(StreamTask?containingTask,?StreamConfig?config,?Output>?output)?{
          ???????super.setup(containingTask,?config,?output);
          ???????this.checkpointingLock?=?getContainingTask().getCheckpointLock();
          ???????this.inStreamElementSerializer?=?new?StreamElementSerializer<>(getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader()));
          ????
          ???????//?create?the?operators?executor?for?the?complete?operations?of?the?queue?entries
          ???????this.executor?=?Executors.newSingleThreadExecutor();
          ???????//?根據(jù)不同的數(shù)據(jù)輸出模式?有序、無序;選擇構(gòu)建不同的StreamElementQueue queue
          ???????switch?(outputMode)?{
          ??????????case?ORDERED:
          ?????????????queue?=?new?OrderedStreamElementQueue(
          ????????????????capacity,
          ????????????????executor,
          ????????????????this);
          ?????????????break;
          ??????????case?UNORDERED:
          ?????????????queue?=?new?UnorderedStreamElementQueue(
          ????????????????capacity,
          ????????????????executor,
          ????????????????this);
          ?????????????break;
          ??????????default:
          ?????????????throw?new?IllegalStateException("Unknown?async?mode:?"?+?outputMode?+?'.');
          ???????}
          ????}
          ????
          ????@Override
          ????public?void?open()?throws?Exception?{
          ???????super.open();
          ???????//?create?the?emitter
          ???????this.emitter?=?new?Emitter<>(checkpointingLock,?output,?queue,?this);
          ????
          ???????//?start?the?emitter?thread
          ???????//?構(gòu)建?消費(fèi)者線程?emitter?Thread?
          ???????this.emitterThread?=?new?Thread(emitter,?"AsyncIO-Emitter-Thread?("?+?getOperatorName()?+?')');
          ???????emitterThread.setDaemon(true);
          ???????emitterThread.start();
          ???????//?.........
          ????}
          ????
          ????@Override
          ????public?void?processElement(StreamRecord?element)?throws?Exception?{
          ???????final?StreamRecordQueueEntry?streamRecordBufferEntry?=?new?StreamRecordQueueEntry<>(element);
          ???????//?注冊一個(gè)定時(shí)器,在超時(shí)時(shí)調(diào)用?timeout?方法
          ???????if?(timeout?>?0L)?{
          ??????????//?register?a?timeout?for?this?AsyncStreamRecordBufferEntry
          ??????????long?timeoutTimestamp?=?timeout?+?getProcessingTimeService().getCurrentProcessingTime();
          ??????????final?ScheduledFuture?timerFuture?=?getProcessingTimeService().registerTimer(
          ?????????????timeoutTimestamp,
          ?????????????new?ProcessingTimeCallback()?{
          ????????????????@Override
          ????????????????public?void?onProcessingTime(long?timestamp)?throws?Exception?{
          ???????????????????userFunction.timeout(element.getValue(),?streamRecordBufferEntry);
          ????????????????}
          ?????????????});
          ??????????//?Cancel?the?timer?once?we've?completed?the?stream?record?buffer?entry.?This?will?remove
          ??????????//?the?register?trigger?task
          ??????????streamRecordBufferEntry.onComplete(
          ?????????????(StreamElementQueueEntry>?value)?->?{
          ????????????????timerFuture.cancel(true);
          ?????????????},
          ?????????????executor);
          ???????}
          ???????//?加入隊(duì)列
          ???????addAsyncBufferEntry(streamRecordBufferEntry);
          ???????//?發(fā)送異步請求
          ???????userFunction.asyncInvoke(element.getValue(),?streamRecordBufferEntry);
          ????}
          ?
          ?//嘗試將待完成的請求加入隊(duì)列,如果隊(duì)列已滿(到達(dá)異步請求的上限),會阻塞
          ?private??void?addAsyncBufferEntry(StreamElementQueueEntry?streamElementQueueEntry)?throws?InterruptedException?{
          ???????assert(Thread.holdsLock(checkpointingLock));
          ???????pendingStreamElementQueueEntry?=?streamElementQueueEntry;
          ?????? while (!queue.tryPut(streamElementQueueEntry))?{?//?將該請求加入隊(duì)列;如果隊(duì)列已滿(到達(dá)異步請求的上限),會阻塞
          ??????????//?we?wait?for?the?emitter?to?notify?us?if?the?queue?has?space?left?again
          ??????????checkpointingLock.wait();
          ???????}
          ???????pendingStreamElementQueueEntry?=?null;
          ????}
          }

          public?class?Emitter?implements?Runnable?{
          ????@Override
          ????public?void?run()?{
          ???????try?{
          ??????????while?(running)?{
          ?????????????LOG.debug("Wait?for?next?completed?async?stream?element?result.");
          ?????????????//?從隊(duì)列阻塞地獲取元素,之后再向下游傳遞
          ?????????????AsyncResult?streamElementEntry?=?streamElementQueue.peekBlockingly();
          ?????????????output(streamElementEntry);
          ??????????}
          ???????}?catch?(InterruptedException?e)?{
          ?????????????//?.........
          ???????}?
          ????}
          }

          有序模式

          在“有序”模式下,所有異步請求的結(jié)果必須按照消息的到達(dá)順序提交到下游算子。在這種模式下,StreamElementQueue的具體是實(shí)現(xiàn)是OrderedStreamElementQueue。

          OrderedStreamElementQueue的底層是一個(gè)有界的隊(duì)列,異步請求的計(jì)算結(jié)果按順序加入到隊(duì)列中,只有隊(duì)列頭部的異步請求完成后才可以從隊(duì)列中獲取計(jì)算結(jié)果。

          有序模式比較簡單,使用一個(gè)隊(duì)列就能實(shí)現(xiàn)。所有新進(jìn)入該算子的元素(包括watermark),都會包裝成Promise并按到達(dá)順序放入該隊(duì)列。

          如下圖所示,盡管P4的結(jié)果先返回,但并不會發(fā)送,只有P1(隊(duì)首)的結(jié)果返回了才會觸發(fā)Emitter拉取隊(duì)首元素進(jìn)行發(fā)送。如下圖所示:

          public?class?OrderedStreamElementQueue?implements?StreamElementQueue?{
          ?/**?Capacity?of?this?queue.?*/
          ?private?final?int?capacity;

          ?/**?Queue?for?the?inserted?StreamElementQueueEntries.?*/
          ?private?final?ArrayDeque>?queue;
          ?
          ????@Override
          ????public?AsyncResult?peekBlockingly()?throws?InterruptedException?{??//?從隊(duì)列中阻塞地獲取已異步完成的元素
          ???????lock.lockInterruptibly();
          ???????try?{
          ??????????while?(queue.isEmpty()?||?!queue.peek().isDone())?{
          ?????????????headIsCompleted.await();
          ??????????}
          ??????????//?只有隊(duì)列頭部的請求完成后才解除阻塞狀態(tài)
          ??????????LOG.debug("Peeked?head?element?from?ordered?stream?element?queue?with?filling?degree?"?+?"({}/{}).",?queue.size(),?capacity);
          ??????????return?queue.peek();
          ???????}?finally?{
          ??????????lock.unlock();
          ???????}
          ????}
          ?
          ??@Override
          ????public?AsyncResult?poll()?throws?InterruptedException?{
          ???????lock.lockInterruptibly();
          ???????try?{
          ??????????while?(queue.isEmpty()?||?!queue.peek().isDone())?{
          ?????????????headIsCompleted.await();
          ??????????}
          ??????????notFull.signalAll();
          ??????????LOG.debug("Polled?head?element?from?ordered?stream?element?queue.?New?filling?degree?"?+?"({}/{}).",?queue.size()?-?1,?capacity);
          ??????????return?queue.poll();
          ???????}?finally?{
          ??????????lock.unlock();
          ???????}
          ????}
          ????
          ????@Override
          ????public??boolean?tryPut(StreamElementQueueEntry?streamElementQueueEntry)?throws?InterruptedException?{
          ?????? lock.lockInterruptibly();??//?將該請求加入隊(duì)列;如果隊(duì)列已滿(到達(dá)異步請求的上限),返回false,其外部會阻塞
          ???????try?{
          ??????????if?(queue.size()??????????????addEntry(streamElementQueueEntry);??
          ?????????????LOG.debug("Put?element?into?ordered?stream?element?queue.?New?filling?degree?"?+?"({}/{}).",?queue.size(),?capacity);
          ?????????????return?true;
          ??????????}?else?{
          ?????????????LOG.debug("Failed?to?put?element?into?ordered?stream?element?queue?because?it?"?+?"was?full?({}/{}).",?queue.size(),?capacity);
          ?????????????return?false;
          ??????????}
          ???????}?finally?{
          ??????????lock.unlock();
          ???????}
          ????}
          }

          無序模式

          在“無序”模式下,異步計(jì)算結(jié)果的提交不是由消息到達(dá)的順序確定的,而是取決于異步請求的完成順序。

          當(dāng)然,在使用“事件時(shí)間”的情況下,要保證watermark語義的正確性。

          在使用“處理時(shí)間”的情況下,由于不存在Watermark,因此可以看作一種特殊的情況。

          在UnorderedStreamElementQueue中巧妙地實(shí)現(xiàn)了這兩種情況。

          ProcessingTime無序

          ProcessingTime無序也比較簡單,因?yàn)闆]有watermark,不需要協(xié)調(diào)watermark與消息的順序性,所以使用兩個(gè)隊(duì)列就能實(shí)現(xiàn),一個(gè)uncompletedQueue、一個(gè)completedQueue。所有新進(jìn)入該算子的元素,同樣的包裝成Promise并放入uncompletedQueue隊(duì)列,當(dāng)uncompletedQueue隊(duì)列中任意的Promise返回了數(shù)據(jù),則將該P(yáng)romise移到completedQueue隊(duì)列中,并通知Emitter消費(fèi)。如下圖所示:

          EventTime無序

          EventTime無序類似于有序與ProcessingTime無序的結(jié)合體。因?yàn)橛衱atermark,需要協(xié)調(diào)watermark與消息之間的順序性,所以uncompletedQueue中存放的元素從原先的Promise變成了Promise集合。

          如果進(jìn)入算子的是消息元素,則會包裝成Promise放入隊(duì)尾的集合中。

          如果進(jìn)入算子的是watermark,也會包裝成Promise并放到一個(gè)獨(dú)立的集合中,再將該集合加入到uncompletedQueue隊(duì)尾,最后再創(chuàng)建一個(gè)空集合加到uncompletedQueue隊(duì)尾。

          這樣,watermark就成了消息順序的邊界。

          只有處在隊(duì)首的集合中的Promise返回了數(shù)據(jù),才能將該P(yáng)romise移到completedQueue隊(duì)列中,由Emitter消費(fèi)發(fā)往下游。

          只有隊(duì)首集合空了,才能處理第二個(gè)集合。這樣就保證了當(dāng)且僅當(dāng)某個(gè)watermark之前所有的消息都已經(jīng)被發(fā)送了,該watermark才能被發(fā)送。

          過程如下圖所示:

          public?class?UnorderedStreamElementQueue?implements?StreamElementQueue?{
          ????/**?Queue?of?uncompleted?stream?element?queue?entries?segmented?by?watermarks.?*/
          ????private?final?ArrayDeque>>?uncompletedQueue;
          ????
          ????/**?Queue?of?completed?stream?element?queue?entries.?*/
          ????private?final?ArrayDeque>?completedQueue;
          ????
          ????/**?First?(chronologically?oldest)?uncompleted?set?of?stream?element?queue?entries.?*/
          ????private?Set>?firstSet;
          ????
          ????//?Last?(chronologically?youngest)?uncompleted?set?of?stream?element?queue?entries.?New
          ????//?stream?element?queue?entries?are?inserted?into?this?set.
          ????private?Set>?lastSet;

          ????@Override
          ????public??boolean?tryPut(StreamElementQueueEntry?streamElementQueueEntry)?throws?InterruptedException?{
          ???????lock.lockInterruptibly();
          ???????try?{
          ??????????if?(numberEntries??????????????addEntry(streamElementQueueEntry);
          ?????????????LOG.debug("Put?element?into?unordered?stream?element?queue.?New?filling?degree?"?+?"({}/{}).",?numberEntries,?capacity);
          ?????????????return?true;
          ??????????}?else?{
          ?????????????LOG.debug("Failed?to?put?element?into?unordered?stream?element?queue?because?it?"?+?"was?full?({}/{}).",?numberEntries,?capacity);
          ?????????????return?false;
          ??????????}
          ???????}?finally?{
          ??????????lock.unlock();
          ???????}
          ????}
          ????
          ????private??void?addEntry(StreamElementQueueEntry?streamElementQueueEntry)?{
          ???????assert(lock.isHeldByCurrentThread());
          ???????if?(streamElementQueueEntry.isWatermark())?{
          ??????????//?如果是watermark,就要構(gòu)造一個(gè)只包含這個(gè)watermark的set加入到uncompletedQueue隊(duì)列中
          ??????????lastSet?=?new?HashSet<>(capacity);
          ??????????if?(firstSet.isEmpty())?{
          ?????????????firstSet.add(streamElementQueueEntry);
          ??????????}?else?{
          ?????????????Set>?watermarkSet?=?new?HashSet<>(1);
          ?????????????watermarkSet.add(streamElementQueueEntry);
          ?????????????uncompletedQueue.offer(watermarkSet);
          ??????????}
          ??????????uncompletedQueue.offer(lastSet);
          ???????}?else?{
          ??????????lastSet.add(streamElementQueueEntry);??//?正常記錄,加入lastSet中
          ???????}
          ????
          ???????streamElementQueueEntry.onComplete(???????//?設(shè)置異步請求完成后的回調(diào)
          ??????????(StreamElementQueueEntry?value)?->?{
          ?????????????try?{
          ????????????????onCompleteHandler(value);
          ?????????????}?catch?(InterruptedException?e)?{
          ????????????????//?......
          ?????????????}
          ??????????},?executor);
          ???????numberEntries++;
          ????}

          ????//?異步請求完成的回調(diào)
          ????public?void?onCompleteHandler(StreamElementQueueEntry?streamElementQueueEntry)?throws?InterruptedException?{
          ???????lock.lockInterruptibly();
          ???????try?{
          ??????????//?如果完成的異步請求在firstSet中,那么就將firstSet中已完成的異步請求轉(zhuǎn)移到completedQueue中
          ??????????if?(firstSet.remove(streamElementQueueEntry))?{??
          ?????????????completedQueue.offer(streamElementQueueEntry);
          ?????????????while?(firstSet.isEmpty()?&&?firstSet?!=?lastSet)?{
          ????????????????//?如果firset中所有的異步請求都完成了,那么就從uncompletedQueue獲取下一個(gè)集合作為firstSet
          ????????????????firstSet?=?uncompletedQueue.poll();
          ????????????????Iterator>?it?=?firstSet.iterator();
          ????????????????while?(it.hasNext())?{
          ???????????????????StreamElementQueueEntry?bufferEntry?=?it.next();
          ???????????????????if?(bufferEntry.isDone())?{
          ??????????????????????completedQueue.offer(bufferEntry);
          ??????????????????????it.remove();
          ???????????????????}
          ????????????????}
          ?????????????}
          ?????????????LOG.debug("Signal?unordered?stream?element?queue?has?completed?entries.");
          ?????????????hasCompletedEntries.signalAll();
          ??????????}
          ???????}?finally?{
          ??????????lock.unlock();
          ???????}
          ????}
          ????
          ????@Override
          ????public?AsyncResult?poll()?throws?InterruptedException?{
          ???????lock.lockInterruptibly();
          ???????try?{
          ??????????//?等待completedQueue中的元素
          ??????????while?(completedQueue.isEmpty())?{
          ?????????????hasCompletedEntries.await();
          ??????????}
          ??????????numberEntries--;
          ??????????notFull.signalAll();
          ??????????LOG.debug("Polled?element?from?unordered?stream?element?queue.?New?filling?degree?"?+?"({}/{}).",?numberEntries,?capacity);
          ??????????return?completedQueue.poll();
          ???????}?finally?{
          ??????????lock.unlock();
          ???????}
          ????}????
          }

          容錯(cuò)

          在異步調(diào)用模式下,可能會同時(shí)有很多個(gè)請求正在處理中。因而在進(jìn)行快照的時(shí)候,需要將異步調(diào)用尚未完成,以及結(jié)果尚未提交給下游的消息加入到狀態(tài)中。在恢復(fù)的時(shí)候,從狀態(tài)中取出這些消息,再重新處理一遍。為了保證exactly-once特性,對于異步調(diào)用已經(jīng)完成,且結(jié)果已經(jīng)由emitter提交給下游的消息就無需保存在快照中。

          public?class?AsyncWaitOperator
          ??????extends?AbstractUdfStreamOperator>
          ??????implements?OneInputStreamOperator,?OperatorActions?{
          ??????????
          ?/**?Recovered?input?stream?elements.?*/
          ?private?transient?ListState?recoveredStreamElements;

          ????@Override
          ????public?void?initializeState(StateInitializationContext?context)?throws?Exception?{
          ???????super.initializeState(context);
          ???????recoveredStreamElements?=?context
          ??????????.getOperatorStateStore()
          ??????????.getListState(new?ListStateDescriptor<>(STATE_NAME,?inStreamElementSerializer));
          ????}
          ????
          ????@Override
          ????public?void?open()?throws?Exception?{
          ???????super.open();
          ???????//?create?the?emitter
          ???????//?創(chuàng)建emitter消費(fèi)線程
          ???????
          ???????//?process?stream?elements?from?state,?since?the?Emit?thread?will?start?as?soon?as?all
          ???????//?elements?from?previous?state?are?in?the?StreamElementQueue,?we?have?to?make?sure?that?the
          ???????//?order?to?open?all?operators?in?the?operator?chain?proceeds?from?the?tail?operator?to?the
          ???????//?head?operator.
          ???????//?狀態(tài)恢復(fù)的時(shí)候,從狀態(tài)中取出所有未完成的消息,重新處理一遍
          ???????if?(recoveredStreamElements?!=?null)?{
          ??????????for?(StreamElement?element?:?recoveredStreamElements.get())?{
          ?????????????if?(element.isRecord())?{
          ????????????????processElement(element.asRecord());
          ?????????????}
          ?????????????else?if?(element.isWatermark())?{
          ????????????????processWatermark(element.asWatermark());
          ?????????????}
          ?????????????else?if?(element.isLatencyMarker())?{
          ????????????????processLatencyMarker(element.asLatencyMarker());
          ?????????????}
          ?????????????else?{
          ????????????????throw?new?IllegalStateException("Unknown?record?type?"?+?element.getClass()?+?"?encountered?while?opening?the?operator.");
          ?????????????}
          ??????????}
          ??????????recoveredStreamElements?=?null;
          ???????}
          ????}
          ????
          ????@Override
          ????public?void?snapshotState(StateSnapshotContext?context)?throws?Exception?{
          ???????super.snapshotState(context);
          ???????//?先清除狀態(tài)
          ???????ListState?partitionableState?=
          ??????????getOperatorStateBackend().getListState(new?ListStateDescriptor<>(STATE_NAME,?inStreamElementSerializer));
          ???????partitionableState.clear();
          ???????
          ???????//?將所有未完成處理請求對應(yīng)的消息加入狀態(tài)中
          ???????Collection>?values?=?queue.values();
          ???????try?{
          ??????????for?(StreamElementQueueEntry?value?:?values)?{
          ?????????????partitionableState.add(value.getStreamElement());
          ??????????}
          ????
          ??????????//?add?the?pending?stream?element?queue?entry?if?the?stream?element?queue?is?currently?full
          ??????????if?(pendingStreamElementQueueEntry?!=?null)?{
          ?????????????partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
          ??????????}
          ???????}?catch?(Exception?e)?{
          ??????????partitionableState.clear();
          ??????????throw?new?Exception("Could?not?add?stream?element?queue?entries?to?operator?state?"?+?"backend?of?operator?"?+?getOperatorName()?+?'.',?e);
          ???????}
          ????}
          }
          如果這個(gè)文章對你有幫助,不要忘記?「在看」?「點(diǎn)贊」?「收藏」?三連啊喂!


          2022年全網(wǎng)首發(fā)|大數(shù)據(jù)專家級技能模型與學(xué)習(xí)指南(勝天半子篇)
          互聯(lián)網(wǎng)最壞的時(shí)代可能真的來了
          我在B站讀大學(xué),大數(shù)據(jù)專業(yè)
          我們在學(xué)習(xí)Flink的時(shí)候,到底在學(xué)習(xí)什么?
          193篇文章暴揍Flink,這個(gè)合集你需要關(guān)注一下
          Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS
          Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問題小盤點(diǎn)
          我們在學(xué)習(xí)Spark的時(shí)候,到底在學(xué)習(xí)什么?
          在所有Spark模塊中,我愿稱SparkSQL為最強(qiáng)!
          硬剛Hive | 4萬字基礎(chǔ)調(diào)優(yōu)面試小總結(jié)
          數(shù)據(jù)治理方法論和實(shí)踐小百科全書
          標(biāo)簽體系下的用戶畫像建設(shè)小指南
          4萬字長文 | ClickHouse基礎(chǔ)&實(shí)踐&調(diào)優(yōu)全視角解析
          【面試&個(gè)人成長】2021年過半,社招和校招的經(jīng)驗(yàn)之談
          大數(shù)據(jù)方向另一個(gè)十年開啟 |《硬剛系列》第一版完結(jié)
          我寫過的關(guān)于成長/面試/職場進(jìn)階的文章
          當(dāng)我們在學(xué)習(xí)Hive的時(shí)候在學(xué)習(xí)什么?「硬剛Hive續(xù)集」
          瀏覽 68
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  久久久精品www | 熟妇一区二区 | 欧美日韩国产在线手机 | 国产美女裸体免费看 | 国产天堂视频在线 |