Flink 源碼深度解析-Async IO的實(shí)現(xiàn)

Hi,我是王知無,一個(gè)大數(shù)據(jù)領(lǐng)域的原創(chuàng)作者。? 放心關(guān)注我,獲取更多行業(yè)的一手消息。
Async I/O的使用方式
在Flink中使用Async I/O的話,需要有一個(gè)支持異步請求的客戶端,或者以多線程異步的方式來將同步操作轉(zhuǎn)化為異步操作調(diào)用;
以官方文檔給出的說明為例:
//?This?example?implements?the?asynchronous?request?and?callback?with?Futures?that?have?the
//?interface?of?Java?8's?futures?(which?is?the?same?one?followed?by?Flink's?Future)
/**
?*?An?implementation?of?the?'AsyncFunction'?that?sends?requests?and?sets?the?callback.
?*/
class?AsyncDatabaseRequest?extends?RichAsyncFunction>?{
????/**?The?database?specific?client?that?can?issue?concurrent?requests?with?callbacks?*/
????private?transient?DatabaseClient?client;
????@Override
????public?void?open(Configuration?parameters)?throws?Exception?{
????????client?=?new?DatabaseClient(host,?post,?credentials);
????}
????@Override
????public?void?close()?throws?Exception?{
????????client.close();
????}
????@Override
????public?void?asyncInvoke(String?key,?final?ResultFuture>?resultFuture)?throws?Exception?{
????????//?issue?the?asynchronous?request,?receive?a?future?for?result
????????//?發(fā)起異步請求,返回結(jié)果是一個(gè)Future
????????final?Future?result?=?client.query(key);
????????//?set?the?callback?to?be?executed?once?the?request?by?the?client?is?complete
????????//?the?callback?simply?forwards?the?result?to?the?result?future
????????//?請求完成時(shí)的回調(diào),將結(jié)果交給?ResultFuture
????????CompletableFuture.supplyAsync(new?Supplier()?{
????????????@Override
????????????public?String?get()?{
????????????????try?{
????????????????????return?result.get();
????????????????}?catch?(InterruptedException?|?ExecutionException?e)?{
????????????????????//?Normally?handled?explicitly.
????????????????????return?null;
????????????????}
????????????}
????????}).thenAccept(?(String?dbResult)?->?{
????????????resultFuture.complete(Collections.singleton(new?Tuple2<>(key,?dbResult)));
????????});
????}
}
//?create?the?original?stream
DataStream?stream?=?...;
//?apply?the?async?I/O?transformation
//?應(yīng)用async?I/O轉(zhuǎn)換,設(shè)置等待模式、超時(shí)時(shí)間、以及進(jìn)行中的異步請求的最大數(shù)量
DataStream>?resultStream?=
????AsyncDataStream.unorderedWait(stream,?new?AsyncDatabaseRequest(),?1000,?TimeUnit.MILLISECONDS,?100);
AsyncDataStream提供了兩種調(diào)用方法,分別是orderedWait和unorderedWait,這分別對應(yīng)了有序和無序兩種輸出模式。
之所以會提供兩種輸出模式,是因?yàn)楫惒秸埱蟮耐瓿蓵r(shí)間是不確定的,先發(fā)出的請求的完成時(shí)間可能會晚于后發(fā)出的請求。
在“有序”的輸出模式下,所有計(jì)算結(jié)果的提交完全和消息的到達(dá)順序一致; 而在“無序”的輸出模式下,計(jì)算結(jié)果的提交則是和請求的完成順序相關(guān)的,先處理完成的請求的計(jì)算結(jié)果會先提交。
值得注意的是,在使用“事件時(shí)間”的情況下,“無序”輸出模式仍然可以保證watermark的正常處理,即在兩個(gè)watermark之間的消息的異步請求結(jié)果可能是異步提交的,但在watermark之后的消息不能先于該watermark之前的消息提交。
由于異步請求的完成時(shí)間不確定,需要設(shè)置請求的超時(shí)時(shí)間,并配置同時(shí)進(jìn)行中的異步請求的最大數(shù)量。
Async I/O的實(shí)現(xiàn)
AsyncDataStream在運(yùn)行時(shí)被轉(zhuǎn)換為AsyncWaitOperator算子,它是AbstractUdfStreamOperator的子類。其AsyncWaitOperator的基本實(shí)現(xiàn)原理如下:
基本原理
AsyncWaitOperator算子相比于其它算子的最大不同在于,它的輸入和輸出并不是同步的。
因此,在AsyncWaitOperator內(nèi)部采用了一種“生產(chǎn)者-消費(fèi)者”模型,基于一個(gè)隊(duì)列解耦異步計(jì)算和計(jì)算結(jié)果的提交。StreamElementQueue提供了一種隊(duì)列的抽象,一個(gè)“消費(fèi)者”線程Emitter從中取出已完成的計(jì)算結(jié)果,并提交給下游算子,而異步請求則充當(dāng)了隊(duì)列“生產(chǎn)者”的角色;

如圖所示,AsyncWaitOperator主要由兩部分組成:StreamElementQueue和Emitter。
StreamElementQueue是一個(gè)Promise隊(duì)列,所謂Promise是一種異步抽象表示將來會有一個(gè)值,這個(gè)隊(duì)列是未完成的Promise隊(duì)列,也就是進(jìn)行中的請求隊(duì)列。Emitter是一個(gè)單獨(dú)的線程,負(fù)責(zé)發(fā)送消息(收到的異步回復(fù))給下游。
圖中E5表示進(jìn)入該算子的第五個(gè)元素(”Element-5”),在執(zhí)行過程中首先會將其包裝成一個(gè)“Promise” P5,然后將P5放入隊(duì)列。最后調(diào)用AsyncFunction的asyncInvoke方法,該方法會向外部服務(wù)發(fā)起一個(gè)異步的請求,并注冊回調(diào)。
該回調(diào)會在異步請求成功返回時(shí)調(diào)用AsyncCollector.collect方法將返回的結(jié)果交給框架處理。
實(shí)際上AsyncCollector是一個(gè)Promise,也就是 P5,在調(diào)用collect的時(shí)候會標(biāo)記Promise為完成狀態(tài),并通知Emitter線程有完成的消息可以發(fā)送了。Emitter就會從隊(duì)列中拉取完成的Promise,并從Promise中取出消息發(fā)送給下游。
public?class?AsyncWaitOperator
??????extends?AbstractUdfStreamOperator>
??????implements?OneInputStreamOperator,?OperatorActions?{
??????????
????/**?Queue?to?store?the?currently?in-flight?stream?elements?into.?*/
????private?transient?StreamElementQueue?queue;???????????????//?存儲帶有異步返回值的請求隊(duì)列
????
????/**?Pending?stream?element?which?could?not?yet?added?to?the?queue.?*/
????private?transient?StreamElementQueueEntry>?pendingStreamElementQueueEntry;
????
????private?transient?ExecutorService?executor;
????
????/**?Emitter?for?the?completed?stream?element?queue?entries.?*/
????private?transient?Emitter?emitter;??????????????????//?異步返回后的消費(fèi)線程
????
????/**?Thread?running?the?emitter.?*/
????private?transient?Thread?emitterThread;
????
????@Override
????public?void?setup(StreamTask,??>?containingTask,?StreamConfig?config,?Output>?output)?{
???????super.setup(containingTask,?config,?output);
???????this.checkpointingLock?=?getContainingTask().getCheckpointLock();
???????this.inStreamElementSerializer?=?new?StreamElementSerializer<>(getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader()));
????
???????//?create?the?operators?executor?for?the?complete?operations?of?the?queue?entries
???????this.executor?=?Executors.newSingleThreadExecutor();
???????//?根據(jù)不同的數(shù)據(jù)輸出模式?有序、無序;選擇構(gòu)建不同的StreamElementQueue queue
???????switch?(outputMode)?{
??????????case?ORDERED:
?????????????queue?=?new?OrderedStreamElementQueue(
????????????????capacity,
????????????????executor,
????????????????this);
?????????????break;
??????????case?UNORDERED:
?????????????queue?=?new?UnorderedStreamElementQueue(
????????????????capacity,
????????????????executor,
????????????????this);
?????????????break;
??????????default:
?????????????throw?new?IllegalStateException("Unknown?async?mode:?"?+?outputMode?+?'.');
???????}
????}
????
????@Override
????public?void?open()?throws?Exception?{
???????super.open();
???????//?create?the?emitter
???????this.emitter?=?new?Emitter<>(checkpointingLock,?output,?queue,?this);
????
???????//?start?the?emitter?thread
???????//?構(gòu)建?消費(fèi)者線程?emitter?Thread?
???????this.emitterThread?=?new?Thread(emitter,?"AsyncIO-Emitter-Thread?("?+?getOperatorName()?+?')');
???????emitterThread.setDaemon(true);
???????emitterThread.start();
???????//?.........
????}
????
????@Override
????public?void?processElement(StreamRecord?element)?throws?Exception?{
???????final?StreamRecordQueueEntry?streamRecordBufferEntry?=?new?StreamRecordQueueEntry<>(element);
???????//?注冊一個(gè)定時(shí)器,在超時(shí)時(shí)調(diào)用?timeout?方法
???????if?(timeout?>?0L)?{
??????????//?register?a?timeout?for?this?AsyncStreamRecordBufferEntry
??????????long?timeoutTimestamp?=?timeout?+?getProcessingTimeService().getCurrentProcessingTime();
??????????final?ScheduledFuture>?timerFuture?=?getProcessingTimeService().registerTimer(
?????????????timeoutTimestamp,
?????????????new?ProcessingTimeCallback()?{
????????????????@Override
????????????????public?void?onProcessingTime(long?timestamp)?throws?Exception?{
???????????????????userFunction.timeout(element.getValue(),?streamRecordBufferEntry);
????????????????}
?????????????});
??????????//?Cancel?the?timer?once?we've?completed?the?stream?record?buffer?entry.?This?will?remove
??????????//?the?register?trigger?task
??????????streamRecordBufferEntry.onComplete(
?????????????(StreamElementQueueEntry>?value)?->?{
????????????????timerFuture.cancel(true);
?????????????},
?????????????executor);
???????}
???????//?加入隊(duì)列
???????addAsyncBufferEntry(streamRecordBufferEntry);
???????//?發(fā)送異步請求
???????userFunction.asyncInvoke(element.getValue(),?streamRecordBufferEntry);
????}
?
?//嘗試將待完成的請求加入隊(duì)列,如果隊(duì)列已滿(到達(dá)異步請求的上限),會阻塞
?private??void?addAsyncBufferEntry(StreamElementQueueEntry?streamElementQueueEntry)?throws?InterruptedException?{
???????assert(Thread.holdsLock(checkpointingLock));
???????pendingStreamElementQueueEntry?=?streamElementQueueEntry;
?????? while (!queue.tryPut(streamElementQueueEntry))?{?//?將該請求加入隊(duì)列;如果隊(duì)列已滿(到達(dá)異步請求的上限),會阻塞
??????????//?we?wait?for?the?emitter?to?notify?us?if?the?queue?has?space?left?again
??????????checkpointingLock.wait();
???????}
???????pendingStreamElementQueueEntry?=?null;
????}
}
public?class?Emitter?implements?Runnable?{
????@Override
????public?void?run()?{
???????try?{
??????????while?(running)?{
?????????????LOG.debug("Wait?for?next?completed?async?stream?element?result.");
?????????????//?從隊(duì)列阻塞地獲取元素,之后再向下游傳遞
?????????????AsyncResult?streamElementEntry?=?streamElementQueue.peekBlockingly();
?????????????output(streamElementEntry);
??????????}
???????}?catch?(InterruptedException?e)?{
?????????????//?.........
???????}?
????}
}
有序模式
在“有序”模式下,所有異步請求的結(jié)果必須按照消息的到達(dá)順序提交到下游算子。在這種模式下,StreamElementQueue的具體是實(shí)現(xiàn)是OrderedStreamElementQueue。
OrderedStreamElementQueue的底層是一個(gè)有界的隊(duì)列,異步請求的計(jì)算結(jié)果按順序加入到隊(duì)列中,只有隊(duì)列頭部的異步請求完成后才可以從隊(duì)列中獲取計(jì)算結(jié)果。
有序模式比較簡單,使用一個(gè)隊(duì)列就能實(shí)現(xiàn)。所有新進(jìn)入該算子的元素(包括watermark),都會包裝成Promise并按到達(dá)順序放入該隊(duì)列。
如下圖所示,盡管P4的結(jié)果先返回,但并不會發(fā)送,只有P1(隊(duì)首)的結(jié)果返回了才會觸發(fā)Emitter拉取隊(duì)首元素進(jìn)行發(fā)送。如下圖所示:

public?class?OrderedStreamElementQueue?implements?StreamElementQueue?{
?/**?Capacity?of?this?queue.?*/
?private?final?int?capacity;
?/**?Queue?for?the?inserted?StreamElementQueueEntries.?*/
?private?final?ArrayDeque>?queue;
?
????@Override
????public?AsyncResult?peekBlockingly()?throws?InterruptedException?{??//?從隊(duì)列中阻塞地獲取已異步完成的元素
???????lock.lockInterruptibly();
???????try?{
??????????while?(queue.isEmpty()?||?!queue.peek().isDone())?{
?????????????headIsCompleted.await();
??????????}
??????????//?只有隊(duì)列頭部的請求完成后才解除阻塞狀態(tài)
??????????LOG.debug("Peeked?head?element?from?ordered?stream?element?queue?with?filling?degree?"?+?"({}/{}).",?queue.size(),?capacity);
??????????return?queue.peek();
???????}?finally?{
??????????lock.unlock();
???????}
????}
?
??@Override
????public?AsyncResult?poll()?throws?InterruptedException?{
???????lock.lockInterruptibly();
???????try?{
??????????while?(queue.isEmpty()?||?!queue.peek().isDone())?{
?????????????headIsCompleted.await();
??????????}
??????????notFull.signalAll();
??????????LOG.debug("Polled?head?element?from?ordered?stream?element?queue.?New?filling?degree?"?+?"({}/{}).",?queue.size()?-?1,?capacity);
??????????return?queue.poll();
???????}?finally?{
??????????lock.unlock();
???????}
????}
????
????@Override
????public??boolean?tryPut(StreamElementQueueEntry?streamElementQueueEntry)?throws?InterruptedException?{
?????? lock.lockInterruptibly();??//?將該請求加入隊(duì)列;如果隊(duì)列已滿(到達(dá)異步請求的上限),返回false,其外部會阻塞
???????try?{
??????????if?(queue.size()??????????????addEntry(streamElementQueueEntry);??
?????????????LOG.debug("Put?element?into?ordered?stream?element?queue.?New?filling?degree?"?+?"({}/{}).",?queue.size(),?capacity);
?????????????return?true;
??????????}?else?{
?????????????LOG.debug("Failed?to?put?element?into?ordered?stream?element?queue?because?it?"?+?"was?full?({}/{}).",?queue.size(),?capacity);
?????????????return?false;
??????????}
???????}?finally?{
??????????lock.unlock();
???????}
????}
}
無序模式
在“無序”模式下,異步計(jì)算結(jié)果的提交不是由消息到達(dá)的順序確定的,而是取決于異步請求的完成順序。
當(dāng)然,在使用“事件時(shí)間”的情況下,要保證watermark語義的正確性。
在使用“處理時(shí)間”的情況下,由于不存在Watermark,因此可以看作一種特殊的情況。
在UnorderedStreamElementQueue中巧妙地實(shí)現(xiàn)了這兩種情況。
ProcessingTime無序
ProcessingTime無序也比較簡單,因?yàn)闆]有watermark,不需要協(xié)調(diào)watermark與消息的順序性,所以使用兩個(gè)隊(duì)列就能實(shí)現(xiàn),一個(gè)uncompletedQueue、一個(gè)completedQueue。所有新進(jìn)入該算子的元素,同樣的包裝成Promise并放入uncompletedQueue隊(duì)列,當(dāng)uncompletedQueue隊(duì)列中任意的Promise返回了數(shù)據(jù),則將該P(yáng)romise移到completedQueue隊(duì)列中,并通知Emitter消費(fèi)。如下圖所示:

EventTime無序
EventTime無序類似于有序與ProcessingTime無序的結(jié)合體。因?yàn)橛衱atermark,需要協(xié)調(diào)watermark與消息之間的順序性,所以uncompletedQueue中存放的元素從原先的Promise變成了Promise集合。
如果進(jìn)入算子的是消息元素,則會包裝成Promise放入隊(duì)尾的集合中。
如果進(jìn)入算子的是watermark,也會包裝成Promise并放到一個(gè)獨(dú)立的集合中,再將該集合加入到uncompletedQueue隊(duì)尾,最后再創(chuàng)建一個(gè)空集合加到uncompletedQueue隊(duì)尾。
這樣,watermark就成了消息順序的邊界。
只有處在隊(duì)首的集合中的Promise返回了數(shù)據(jù),才能將該P(yáng)romise移到completedQueue隊(duì)列中,由Emitter消費(fèi)發(fā)往下游。
只有隊(duì)首集合空了,才能處理第二個(gè)集合。這樣就保證了當(dāng)且僅當(dāng)某個(gè)watermark之前所有的消息都已經(jīng)被發(fā)送了,該watermark才能被發(fā)送。
過程如下圖所示:

public?class?UnorderedStreamElementQueue?implements?StreamElementQueue?{
????/**?Queue?of?uncompleted?stream?element?queue?entries?segmented?by?watermarks.?*/
????private?final?ArrayDeque>>?uncompletedQueue;
????
????/**?Queue?of?completed?stream?element?queue?entries.?*/
????private?final?ArrayDeque>?completedQueue;
????
????/**?First?(chronologically?oldest)?uncompleted?set?of?stream?element?queue?entries.?*/
????private?Set>?firstSet;
????
????//?Last?(chronologically?youngest)?uncompleted?set?of?stream?element?queue?entries.?New
????//?stream?element?queue?entries?are?inserted?into?this?set.
????private?Set>?lastSet;
????@Override
????public??boolean?tryPut(StreamElementQueueEntry?streamElementQueueEntry)?throws?InterruptedException?{
???????lock.lockInterruptibly();
???????try?{
??????????if?(numberEntries??????????????addEntry(streamElementQueueEntry);
?????????????LOG.debug("Put?element?into?unordered?stream?element?queue.?New?filling?degree?"?+?"({}/{}).",?numberEntries,?capacity);
?????????????return?true;
??????????}?else?{
?????????????LOG.debug("Failed?to?put?element?into?unordered?stream?element?queue?because?it?"?+?"was?full?({}/{}).",?numberEntries,?capacity);
?????????????return?false;
??????????}
???????}?finally?{
??????????lock.unlock();
???????}
????}
????
????private??void?addEntry(StreamElementQueueEntry?streamElementQueueEntry)?{
???????assert(lock.isHeldByCurrentThread());
???????if?(streamElementQueueEntry.isWatermark())?{
??????????//?如果是watermark,就要構(gòu)造一個(gè)只包含這個(gè)watermark的set加入到uncompletedQueue隊(duì)列中
??????????lastSet?=?new?HashSet<>(capacity);
??????????if?(firstSet.isEmpty())?{
?????????????firstSet.add(streamElementQueueEntry);
??????????}?else?{
?????????????Set>?watermarkSet?=?new?HashSet<>(1);
?????????????watermarkSet.add(streamElementQueueEntry);
?????????????uncompletedQueue.offer(watermarkSet);
??????????}
??????????uncompletedQueue.offer(lastSet);
???????}?else?{
??????????lastSet.add(streamElementQueueEntry);??//?正常記錄,加入lastSet中
???????}
????
???????streamElementQueueEntry.onComplete(???????//?設(shè)置異步請求完成后的回調(diào)
??????????(StreamElementQueueEntry?value)?->?{
?????????????try?{
????????????????onCompleteHandler(value);
?????????????}?catch?(InterruptedException?e)?{
????????????????//?......
?????????????}
??????????},?executor);
???????numberEntries++;
????}
????//?異步請求完成的回調(diào)
????public?void?onCompleteHandler(StreamElementQueueEntry>?streamElementQueueEntry)?throws?InterruptedException?{
???????lock.lockInterruptibly();
???????try?{
??????????//?如果完成的異步請求在firstSet中,那么就將firstSet中已完成的異步請求轉(zhuǎn)移到completedQueue中
??????????if?(firstSet.remove(streamElementQueueEntry))?{??
?????????????completedQueue.offer(streamElementQueueEntry);
?????????????while?(firstSet.isEmpty()?&&?firstSet?!=?lastSet)?{
????????????????//?如果firset中所有的異步請求都完成了,那么就從uncompletedQueue獲取下一個(gè)集合作為firstSet
????????????????firstSet?=?uncompletedQueue.poll();
????????????????Iterator>?it?=?firstSet.iterator();
????????????????while?(it.hasNext())?{
???????????????????StreamElementQueueEntry>?bufferEntry?=?it.next();
???????????????????if?(bufferEntry.isDone())?{
??????????????????????completedQueue.offer(bufferEntry);
??????????????????????it.remove();
???????????????????}
????????????????}
?????????????}
?????????????LOG.debug("Signal?unordered?stream?element?queue?has?completed?entries.");
?????????????hasCompletedEntries.signalAll();
??????????}
???????}?finally?{
??????????lock.unlock();
???????}
????}
????
????@Override
????public?AsyncResult?poll()?throws?InterruptedException?{
???????lock.lockInterruptibly();
???????try?{
??????????//?等待completedQueue中的元素
??????????while?(completedQueue.isEmpty())?{
?????????????hasCompletedEntries.await();
??????????}
??????????numberEntries--;
??????????notFull.signalAll();
??????????LOG.debug("Polled?element?from?unordered?stream?element?queue.?New?filling?degree?"?+?"({}/{}).",?numberEntries,?capacity);
??????????return?completedQueue.poll();
???????}?finally?{
??????????lock.unlock();
???????}
????}????
}
容錯(cuò)
在異步調(diào)用模式下,可能會同時(shí)有很多個(gè)請求正在處理中。因而在進(jìn)行快照的時(shí)候,需要將異步調(diào)用尚未完成,以及結(jié)果尚未提交給下游的消息加入到狀態(tài)中。在恢復(fù)的時(shí)候,從狀態(tài)中取出這些消息,再重新處理一遍。為了保證exactly-once特性,對于異步調(diào)用已經(jīng)完成,且結(jié)果已經(jīng)由emitter提交給下游的消息就無需保存在快照中。
public?class?AsyncWaitOperator
??????extends?AbstractUdfStreamOperator>
??????implements?OneInputStreamOperator,?OperatorActions?{
??????????
?/**?Recovered?input?stream?elements.?*/
?private?transient?ListState?recoveredStreamElements;
????@Override
????public?void?initializeState(StateInitializationContext?context)?throws?Exception?{
???????super.initializeState(context);
???????recoveredStreamElements?=?context
??????????.getOperatorStateStore()
??????????.getListState(new?ListStateDescriptor<>(STATE_NAME,?inStreamElementSerializer));
????}
????
????@Override
????public?void?open()?throws?Exception?{
???????super.open();
???????//?create?the?emitter
???????//?創(chuàng)建emitter消費(fèi)線程
???????
???????//?process?stream?elements?from?state,?since?the?Emit?thread?will?start?as?soon?as?all
???????//?elements?from?previous?state?are?in?the?StreamElementQueue,?we?have?to?make?sure?that?the
???????//?order?to?open?all?operators?in?the?operator?chain?proceeds?from?the?tail?operator?to?the
???????//?head?operator.
???????//?狀態(tài)恢復(fù)的時(shí)候,從狀態(tài)中取出所有未完成的消息,重新處理一遍
???????if?(recoveredStreamElements?!=?null)?{
??????????for?(StreamElement?element?:?recoveredStreamElements.get())?{
?????????????if?(element.isRecord())?{
????????????????processElement(element.asRecord());
?????????????}
?????????????else?if?(element.isWatermark())?{
????????????????processWatermark(element.asWatermark());
?????????????}
?????????????else?if?(element.isLatencyMarker())?{
????????????????processLatencyMarker(element.asLatencyMarker());
?????????????}
?????????????else?{
????????????????throw?new?IllegalStateException("Unknown?record?type?"?+?element.getClass()?+?"?encountered?while?opening?the?operator.");
?????????????}
??????????}
??????????recoveredStreamElements?=?null;
???????}
????}
????
????@Override
????public?void?snapshotState(StateSnapshotContext?context)?throws?Exception?{
???????super.snapshotState(context);
???????//?先清除狀態(tài)
???????ListState?partitionableState?=
??????????getOperatorStateBackend().getListState(new?ListStateDescriptor<>(STATE_NAME,?inStreamElementSerializer));
???????partitionableState.clear();
???????
???????//?將所有未完成處理請求對應(yīng)的消息加入狀態(tài)中
???????Collection>?values?=?queue.values();
???????try?{
??????????for?(StreamElementQueueEntry>?value?:?values)?{
?????????????partitionableState.add(value.getStreamElement());
??????????}
????
??????????//?add?the?pending?stream?element?queue?entry?if?the?stream?element?queue?is?currently?full
??????????if?(pendingStreamElementQueueEntry?!=?null)?{
?????????????partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
??????????}
???????}?catch?(Exception?e)?{
??????????partitionableState.clear();
??????????throw?new?Exception("Could?not?add?stream?element?queue?entries?to?operator?state?"?+?"backend?of?operator?"?+?getOperatorName()?+?'.',?e);
???????}
????}
} 
