Flink 解析 | Flink 源碼:廣播流狀態(tài)源碼解析

Broadcast State 是 Operator State 的一種特殊類型。它的引入是為了支持這樣的場景: 一個流的記錄需要廣播到所有下游任務(wù),在這些用例中,它們用于在所有子任務(wù)中維護(hù)相同的狀態(tài)。然后可以在處理第二個流的數(shù)據(jù)時訪問這個廣播狀態(tài),廣播狀態(tài)有自己的一些特性。
必須定義為一個 Map 結(jié)構(gòu)。 廣播狀態(tài)只能在廣播流側(cè)修改,非廣播側(cè)不能修改狀態(tài)。 Broadcast State 運行時的狀態(tài)只能保存在內(nèi)存中。
看到這相信你肯定會有下面的疑問:
廣播狀態(tài)為什么必須定義為 Map 結(jié)構(gòu),我用其他的狀態(tài)類型不行嗎? 廣播狀態(tài)為什么只能在廣播側(cè)修改,非廣播側(cè)為什么不能修改呢? 廣播狀態(tài)為什么只能保存在內(nèi)存中,難道不能用 Rockdb 狀態(tài)后端嗎?
下面就帶著這三個疑問通過閱讀相關(guān)源碼,回答上面的問題。
broadcast 源碼
/**
?*?Sets?the?partitioning?of?the?{@link?DataStream}?so?that?the?output?elements?are?broadcasted
?*?to?every?parallel?instance?of?the?next?operation.?In?addition,?it?implicitly?as?many?{@link
?*?org.apache.flink.api.common.state.BroadcastState?broadcast?states}?as?the?specified
?*?descriptors?which?can?be?used?to?store?the?element?of?the?stream.
?*
?*?@param?broadcastStateDescriptors?the?descriptors?of?the?broadcast?states?to?create.
?*?@return?A?{@link?BroadcastStream}?which?can?be?used?in?the?{@link?#connect(BroadcastStream)}
?*?????to?create?a?{@link?BroadcastConnectedStream}?for?further?processing?of?the?elements.
?*/
@PublicEvolving
public?BroadcastStream?broadcast(
????????final?MapStateDescriptor,??>...?broadcastStateDescriptors)? {
????Preconditions.checkNotNull(broadcastStateDescriptors);
????final?DataStream?broadcastStream?=?setConnectionType(new?BroadcastPartitioner<>());
????return?new?BroadcastStream<>(environment,?broadcastStream,?broadcastStateDescriptors);
}
可以發(fā)現(xiàn) broadcast 方法需要的參數(shù)是 MapStateDescriptor 也就是一個 Map 結(jié)構(gòu)的狀態(tài)描述符,我們在使用的時候就必須定義為 MapStateDescriptor,否則會直接報錯,其實主要是因為廣播狀態(tài)的作用是和非廣播流進(jìn)行關(guān)聯(lián),你可以想象成雙流 join 的場景,那么 join 的時候就必須要有一個主鍵,也就是相同的 key 才能 join 上,所以 Map(key-value) 結(jié)構(gòu)是最適合這種場景的,key 可以存儲要關(guān)聯(lián)字段,value 可以是任意類型的廣播數(shù)據(jù),在關(guān)聯(lián)的時候只需要獲取到廣播狀態(tài),然后 state.get(key) 就可以很容易拿到廣播數(shù)據(jù)。
process 源碼
@PublicEvolving
public??SingleOutputStreamOperator?process(
????????final?KeyedBroadcastProcessFunction?function) ? {
??//?獲取輸出數(shù)據(jù)的類型信息
????TypeInformation?outTypeInfo?=
????????????TypeExtractor.getBinaryOperatorReturnType(
????????????????????function,
????????????????????KeyedBroadcastProcessFunction.class,
????????????????????1,
????????????????????2,
????????????????????3,
????????????????????TypeExtractor.NO_INDEX,
????????????????????getType1(),
????????????????????getType2(),
????????????????????Utils.getCallLocationName(),
????????????????????true);
????return?process(function,?outTypeInfo);
}
process 方法需要的參數(shù)是 KeyedBroadcastProcessFunction
process 源碼
@PublicEvolving
public??SingleOutputStreamOperator?process(
????????final?KeyedBroadcastProcessFunction?function,
????????final?TypeInformation?outTypeInfo) ? {
????Preconditions.checkNotNull(function);
????Preconditions.checkArgument(
????????????nonBroadcastStream?instanceof?KeyedStream,
????????????"A?KeyedBroadcastProcessFunction?can?only?be?used?on?a?keyed?stream.");
??
????return?transform(function,?outTypeInfo);
}
這個 process 方法里面什么都沒干,直接調(diào)用 transform 方法。
transform 源碼
@Internal
private??SingleOutputStreamOperator?transform(
????????final?KeyedBroadcastProcessFunction?userFunction,
????????final?TypeInformation?outTypeInfo) ? {
????//?read?the?output?type?of?the?input?Transforms?to?coax?out?errors?about?MissingTypeInfo
????nonBroadcastStream.getType();
????broadcastStream.getType();
????KeyedStream?keyedInputStream?=?(KeyedStream)?nonBroadcastStream;
??//?構(gòu)造?KeyedBroadcastStateTransformation
????final?KeyedBroadcastStateTransformation?transformation?=
????????????new?KeyedBroadcastStateTransformation<>(
????????????????????"Co-Process-Broadcast-Keyed",
????????????????????nonBroadcastStream.getTransformation(),
????????????????????broadcastStream.getTransformation(),
????????????????????clean(userFunction),
????????????????????broadcastStateDescriptors,
????????????????????keyedInputStream.getKeyType(),
????????????????????keyedInputStream.getKeySelector(),
????????????????????outTypeInfo,
????????????????????environment.getParallelism());
????@SuppressWarnings({"unchecked",?"rawtypes"})
????final?SingleOutputStreamOperator?returnStream?=
????????????new?SingleOutputStreamOperator(environment,?transformation);
??//?添加到?List>?集合
????getExecutionEnvironment().addOperator(transformation);
????return?returnStream;
}
transform 方法里面主要做了兩件事:
先是構(gòu)造對應(yīng)的 KeyedBroadcastStateTransformation 對象,其實 ?KeyedBroadcastStateTransformation 也是 Transformation 的一個子類。 然后把構(gòu)造好的 transformation 添加到 List > 集合里,后面在構(gòu)建 StreamGraph 的時候會從這個集合里獲取 Transformation。
getStreamGraph 源碼
@Internal
public?StreamGraph?getStreamGraph(boolean?clearTransformations)?{
????final?StreamGraph?streamGraph?=?getStreamGraphGenerator(transformations).generate();
????if?(clearTransformations)?{
????????transformations.clear();
????}
????return?streamGraph;
}
getStreamGraph 的主要作用就是生成 StreamGraph。下面就會用到上一步生成的 List
getStreamGraphGenerator 源碼
private?StreamGraphGenerator?getStreamGraphGenerator(List>?transformations) ?{
????if?(transformations.size()?<=?0)?{
????????throw?new?IllegalStateException(
????????????????"No?operators?defined?in?streaming?topology.?Cannot?execute.");
????}
????//?We?copy?the?transformation?so?that?newly?added?transformations?cannot?intervene?with?the
????//?stream?graph?generation.
????return?new?StreamGraphGenerator(
????????????????????new?ArrayList<>(transformations),?config,?checkpointCfg,?configuration)
????????????.setStateBackend(defaultStateBackend)
????????????.setChangelogStateBackendEnabled(changelogStateBackendEnabled)
????????????.setSavepointDir(defaultSavepointDirectory)
????????????.setChaining(isChainingEnabled)
????????????.setUserArtifacts(cacheFile)
????????????.setTimeCharacteristic(timeCharacteristic)
????????????.setDefaultBufferTimeout(bufferTimeout)
????????????.setSlotSharingGroupResource(slotSharingGroupResources);
}
getStreamGraphGenerator 方法主要就是構(gòu)造 StreamGraphGenerator 對象,StreamGraphGenerator 構(gòu)造完成后,就可以調(diào)用 generate 方法來產(chǎn)生 StreamGraph 了,在看 generate 方法之前先來看一下 StreamGraphGenerator 的靜態(tài)代碼塊。
StreamGraphGenerator 源碼
static?{
????@SuppressWarnings("rawtypes")
????Map,?TransformationTranslator,???extends?Transformation>>
????????????tmp?=?new?HashMap<>();
????tmp.put(OneInputTransformation.class,?new?OneInputTransformationTranslator<>());
????tmp.put(TwoInputTransformation.class,?new?TwoInputTransformationTranslator<>());
????tmp.put(MultipleInputTransformation.class,?new?MultiInputTransformationTranslator<>());
????tmp.put(KeyedMultipleInputTransformation.class,?new?MultiInputTransformationTranslator<>());
????tmp.put(SourceTransformation.class,?new?SourceTransformationTranslator<>());
????tmp.put(SinkTransformation.class,?new?SinkTransformationTranslator<>());
????tmp.put(LegacySinkTransformation.class,?new?LegacySinkTransformationTranslator<>());
????tmp.put(LegacySourceTransformation.class,?new?LegacySourceTransformationTranslator<>());
????tmp.put(UnionTransformation.class,?new?UnionTransformationTranslator<>());
????tmp.put(PartitionTransformation.class,?new?PartitionTransformationTranslator<>());
????tmp.put(SideOutputTransformation.class,?new?SideOutputTransformationTranslator<>());
????tmp.put(ReduceTransformation.class,?new?ReduceTransformationTranslator<>());
????tmp.put(
????????????TimestampsAndWatermarksTransformation.class,
????????????new?TimestampsAndWatermarksTransformationTranslator<>());
????tmp.put(BroadcastStateTransformation.class,?new?BroadcastStateTransformationTranslator<>());
????tmp.put(
????????????KeyedBroadcastStateTransformation.class,
????????????new?KeyedBroadcastStateTransformationTranslator<>());
????translatorMap?=?Collections.unmodifiableMap(tmp);
}
在初始化 StreamGraphGenerator 之前,會先執(zhí)行其靜態(tài)代碼塊生成一個 Transformation -> TransformationTranslator 映射關(guān)系的 Map 集合,后面會用到這個 Map。
transform 源碼
//?根據(jù)?Transformation?獲取對應(yīng)的?TransformationTranslator?
final?TransformationTranslator,?Transformation>>?translator?=
????????(TransformationTranslator,?Transformation>>)
????????????????translatorMap.get(transform.getClass());
Collection?transformedIds;
if?(translator?!=?null)?{
???
????transformedIds?=?translate(translator,?transform);
}?else?{
????transformedIds?=?legacyTransform(transform);
}
構(gòu)造完 StreamGraphGenerator 對象后,緊接著會調(diào)用 generate 方法,然后又調(diào)用了 transform 方法,這里會從上面生成的 Map 里面獲取到對應(yīng)的 TransformationTranslator,然后調(diào)用 translate 方法。
translate#translateForStreaming#translateForStreamingInternal 源碼
@Override
protected?Collection?translateForStreamingInternal(
????????final?KeyedBroadcastStateTransformation?transformation,
????????final?Context?context) ? {
????checkNotNull(transformation);
????checkNotNull(context);
??//?構(gòu)建?CoBroadcastWithKeyedOperator?
????CoBroadcastWithKeyedOperator?operator?=
????????????new?CoBroadcastWithKeyedOperator<>(
????????????????????transformation.getUserFunction(),
????????????????????transformation.getBroadcastStateDescriptors());
????return?translateInternal(
????????????transformation,
????????????transformation.getRegularInput(),
????????????transformation.getBroadcastInput(),
????????????SimpleOperatorFactory.of(operator),
????????????transformation.getStateKeyType(),
????????????transformation.getKeySelector(),
????????????null?/*?no?key?selector?on?broadcast?input?*/,
????????????context);
}
translate 方法最終會調(diào)用到 KeyedBroadcastStateTransformationTranslator 的 translateForStreamingInternal 方法中,根據(jù) UserFunction(用戶代碼)和 broadcastStateDescriptors(廣播狀態(tài)描述符)構(gòu)造CoBroadcastWithKeyedOperator 對象。
CoBroadcastWithKeyedOperator 源碼
/**
?*?A?{@link?TwoInputStreamOperator}?for?executing?{@link?KeyedBroadcastProcessFunction
?*?KeyedBroadcastProcessFunctions}.
?*
?*?@param??The?key?type?of?the?input?keyed?stream.
?*?@param??The?input?type?of?the?keyed?(non-broadcast)?side.
?*?@param??The?input?type?of?the?broadcast?side.
?*?@param??The?output?type?of?the?operator.
?*/
@Internal
public?class?CoBroadcastWithKeyedOperator<KS,?IN1,?IN2,?OUT>
????????extends?AbstractUdfStreamOperator<OUT,?KeyedBroadcastProcessFunction<KS,?IN1,?IN2,?OUT>>
????????implements?TwoInputStreamOperator<IN1,?IN2,?OUT>,?Triggerable<KS,?VoidNamespace>?{
????private?static?final?long?serialVersionUID?=?5926499536290284870L;
????private?final?List>?broadcastStateDescriptors;
????private?transient?TimestampedCollector?collector;
????private?transient?Map,?BroadcastState,??>>?broadcastStates;
????private?transient?ReadWriteContextImpl?rwContext;
????private?transient?ReadOnlyContextImpl?rContext;
????private?transient?OnTimerContextImpl?onTimerContext;
????public?CoBroadcastWithKeyedOperator(
????????????final?KeyedBroadcastProcessFunction?function,
????????????final?List>?broadcastStateDescriptors) ?{
????????super(function);
????????this.broadcastStateDescriptors?=?Preconditions.checkNotNull(broadcastStateDescriptors);
????}
????@Override
????public?void?open()?throws?Exception?{
????????super.open();
????????InternalTimerService?internalTimerService?=
????????????????getInternalTimerService("user-timers",?VoidNamespaceSerializer.INSTANCE,?this);
????????TimerService?timerService?=?new?SimpleTimerService(internalTimerService);
????????collector?=?new?TimestampedCollector<>(output);
????????this.broadcastStates?=?new?HashMap<>(broadcastStateDescriptors.size());
????????for?(MapStateDescriptor,??>?descriptor?:?broadcastStateDescriptors)?{
????????????broadcastStates.put(
????????????????????descriptor,?
??????????????//?初始化狀態(tài)實現(xiàn)實例
??????????????getOperatorStateBackend().getBroadcastState(descriptor));
????????}
????????rwContext?=
????????????????new?ReadWriteContextImpl(
????????????????????????getExecutionConfig(),
????????????????????????getKeyedStateBackend(),
????????????????????????userFunction,
????????????????????????broadcastStates,
????????????????????????timerService);
????????rContext?=
????????????????new?ReadOnlyContextImpl(
????????????????????????getExecutionConfig(),?userFunction,?broadcastStates,?timerService);
????????onTimerContext?=
????????????????new?OnTimerContextImpl(
????????????????????????getExecutionConfig(),?userFunction,?broadcastStates,?timerService);
????}
????@Override
????public?void?processElement1(StreamRecord?element) ?throws?Exception?{
????????collector.setTimestamp(element);
????????rContext.setElement(element);
????????userFunction.processElement(element.getValue(),?rContext,?collector);
????????rContext.setElement(null);
????}
????@Override
????public?void?processElement2(StreamRecord?element) ?throws?Exception?{
????????collector.setTimestamp(element);
????????rwContext.setElement(element);
????????userFunction.processBroadcastElement(element.getValue(),?rwContext,?collector);
????????rwContext.setElement(null);
????}
????private?class?ReadWriteContextImpl
????????????extends?KeyedBroadcastProcessFunction<KS,?IN1,?IN2,?OUT>.Context?{
????????private?final?ExecutionConfig?config;
????????private?final?KeyedStateBackend?keyedStateBackend;
????????private?final?Map,?BroadcastState,??>>?states;
????????private?final?TimerService?timerService;
????????private?StreamRecord?element;
????????ReadWriteContextImpl(
????????????????final?ExecutionConfig?executionConfig,
????????????????final?KeyedStateBackend?keyedStateBackend,
????????????????final?KeyedBroadcastProcessFunction?function,
????????????????final?Map,?BroadcastState,??>>?broadcastStates,
????????????????final?TimerService?timerService)?{
????????????function.super();
????????????this.config?=?Preconditions.checkNotNull(executionConfig);
????????????this.keyedStateBackend?=?Preconditions.checkNotNull(keyedStateBackend);
????????????this.states?=?Preconditions.checkNotNull(broadcastStates);
????????????this.timerService?=?Preconditions.checkNotNull(timerService);
????????}
????????void?setElement(StreamRecord?e) ?{
????????????this.element?=?e;
????????}
????????@Override
????????public?Long?timestamp()?{
????????????checkState(element?!=?null);
????????????return?element.getTimestamp();
????????}
????????@Override
????????public??BroadcastState?getBroadcastState(
????????????????MapStateDescriptor?stateDescriptor) ? {
????????????Preconditions.checkNotNull(stateDescriptor);
????????????stateDescriptor.initializeSerializerUnlessSet(config);
????????????BroadcastState?state?=?(BroadcastState)?states.get(stateDescriptor);
????????????if?(state?==?null)?{
????????????????throw?new?IllegalArgumentException(
????????????????????????"The?requested?state?does?not?exist.?"
????????????????????????????????+?"Check?for?typos?in?your?state?descriptor,?or?specify?the?state?descriptor?"
????????????????????????????????+?"in?the?datastream.broadcast(...)?call?if?you?forgot?to?register?it.");
????????????}
????????????return?state;
????????}
????????@Override
????????public??void?output(OutputTag?outputTag,?X?value) ?{
????????????checkArgument(outputTag?!=?null,?"OutputTag?must?not?be?null.");
????????????output.collect(outputTag,?new?StreamRecord<>(value,?element.getTimestamp()));
????????}
????????@Override
????????public?long?currentProcessingTime()?{
????????????return?timerService.currentProcessingTime();
????????}
????????@Override
????????public?long?currentWatermark()?{
????????????return?timerService.currentWatermark();
????????}
????????@Override
????????public??void?applyToKeyedState(
????????????????final?StateDescriptor?stateDescriptor,
????????????????final?KeyedStateFunction?function)
????????????????throws?Exception?{
????????????keyedStateBackend.applyToAllKeys(
????????????????????VoidNamespace.INSTANCE,
????????????????????VoidNamespaceSerializer.INSTANCE,
????????????????????Preconditions.checkNotNull(stateDescriptor),
????????????????????Preconditions.checkNotNull(function));
????????}
????}
????private?class?ReadOnlyContextImpl?extends?ReadOnlyContext?{
????????private?final?ExecutionConfig?config;
????????private?final?Map,?BroadcastState,??>>?states;
????????private?final?TimerService?timerService;
????????private?StreamRecord?element;
????????ReadOnlyContextImpl(
????????????????final?ExecutionConfig?executionConfig,
????????????????final?KeyedBroadcastProcessFunction?function,
????????????????final?Map,?BroadcastState,??>>?broadcastStates,
????????????????final?TimerService?timerService)?{
????????????function.super();
????????????this.config?=?Preconditions.checkNotNull(executionConfig);
????????????this.states?=?Preconditions.checkNotNull(broadcastStates);
????????????this.timerService?=?Preconditions.checkNotNull(timerService);
????????}
????????void?setElement(StreamRecord?e) ?{
????????????this.element?=?e;
????????}
????????@Override
????????public?Long?timestamp()?{
????????????checkState(element?!=?null);
????????????return?element.hasTimestamp()???element.getTimestamp()?:?null;
????????}
????????@Override
????????public?TimerService?timerService()?{
????????????return?timerService;
????????}
????????@Override
????????public?long?currentProcessingTime()?{
????????????return?timerService.currentProcessingTime();
????????}
????????@Override
????????public?long?currentWatermark()?{
????????????return?timerService.currentWatermark();
????????}
????????@Override
????????public??void?output(OutputTag?outputTag,?X?value) ?{
????????????checkArgument(outputTag?!=?null,?"OutputTag?must?not?be?null.");
????????????output.collect(outputTag,?new?StreamRecord<>(value,?element.getTimestamp()));
????????}
????????@Override
????????public??ReadOnlyBroadcastState?getBroadcastState(
????????????????MapStateDescriptor?stateDescriptor) ? {
????????????Preconditions.checkNotNull(stateDescriptor);
????????????stateDescriptor.initializeSerializerUnlessSet(config);
????????????ReadOnlyBroadcastState?state?=
????????????????????(ReadOnlyBroadcastState)?states.get(stateDescriptor);
????????????if?(state?==?null)?{
????????????????throw?new?IllegalArgumentException(
????????????????????????"The?requested?state?does?not?exist.?"
????????????????????????????????+?"Check?for?typos?in?your?state?descriptor,?or?specify?the?state?descriptor?"
????????????????????????????????+?"in?the?datastream.broadcast(...)?call?if?you?forgot?to?register?it.");
????????????}
????????????return?state;
????????}
????????@Override
????????@SuppressWarnings("unchecked")
????????public?KS?getCurrentKey()?{
????????????return?(KS)?CoBroadcastWithKeyedOperator.this.getCurrentKey();
????????}
????}
????private?class?OnTimerContextImpl
????????????extends?KeyedBroadcastProcessFunction<KS,?IN1,?IN2,?OUT>.OnTimerContext?{
????????private?final?ExecutionConfig?config;
????????private?final?Map,?BroadcastState,??>>?states;
????????private?final?TimerService?timerService;
????????private?TimeDomain?timeDomain;
????????private?InternalTimer?timer;
????????OnTimerContextImpl(
????????????????final?ExecutionConfig?executionConfig,
????????????????final?KeyedBroadcastProcessFunction?function,
????????????????final?Map,?BroadcastState,??>>?broadcastStates,
????????????????final?TimerService?timerService)?{
????????????function.super();
????????????this.config?=?Preconditions.checkNotNull(executionConfig);
????????????this.states?=?Preconditions.checkNotNull(broadcastStates);
????????????this.timerService?=?Preconditions.checkNotNull(timerService);
????????}
????????@Override
????????public?Long?timestamp()?{
????????????checkState(timer?!=?null);
????????????return?timer.getTimestamp();
????????}
????????@Override
????????public?TimeDomain?timeDomain()?{
????????????checkState(timeDomain?!=?null);
????????????return?timeDomain;
????????}
????????@Override
????????public?KS?getCurrentKey()?{
????????????return?timer.getKey();
????????}
????????@Override
????????public?TimerService?timerService()?{
????????????return?timerService;
????????}
????????@Override
????????public?long?currentProcessingTime()?{
????????????return?timerService.currentProcessingTime();
????????}
????????@Override
????????public?long?currentWatermark()?{
????????????return?timerService.currentWatermark();
????????}
????????@Override
????????public??void?output(OutputTag?outputTag,?X?value) ?{
????????????checkArgument(outputTag?!=?null,?"OutputTag?must?not?be?null.");
????????????output.collect(outputTag,?new?StreamRecord<>(value,?timer.getTimestamp()));
????????}
????????@Override
????????public??ReadOnlyBroadcastState?getBroadcastState(
????????????????MapStateDescriptor?stateDescriptor) ? {
????????????Preconditions.checkNotNull(stateDescriptor);
????????????stateDescriptor.initializeSerializerUnlessSet(config);
????????????ReadOnlyBroadcastState?state?=
????????????????????(ReadOnlyBroadcastState)?states.get(stateDescriptor);
????????????if?(state?==?null)?{
????????????????throw?new?IllegalArgumentException(
????????????????????????"The?requested?state?does?not?exist.?"
????????????????????????????????+?"Check?for?typos?in?your?state?descriptor,?or?specify?the?state?descriptor?"
????????????????????????????????+?"in?the?datastream.broadcast(...)?call?if?you?forgot?to?register?it.");
????????????}
????????????return?state;
????????}
????}
}
在分析 CoBroadcastWithKeyedOperator 源碼之前,先來看一下 CoBroadcastWithKeyedOperator 的 UML 圖。
CoBroadcastWithKeyedOperator UML 圖

可以看到 CoBroadcastWithKeyedOperator 實現(xiàn)了 TwoInputStreamOperator 這個接口,從命名上就能知道,這是一個具有兩個輸入流的 StreamOperator 接口,因為 CoBroadcastWithKeyedOperator 的上游連接的是兩個數(shù)據(jù)流,所以就實現(xiàn)了這個接口,下面再來看一下 TwoInputStreamOperator 的源碼。
TwoInputStreamOperator 源碼
/**
?*?Interface?for?stream?operators?with?two?inputs.?Use?{@link
?*?org.apache.flink.streaming.api.operators.AbstractStreamOperator}?as?a?base?class?if?you?want?to
?*?implement?a?custom?operator.
?*
?*?@param??The?input?type?of?the?operator
?*?@param??The?input?type?of?the?operator
?*?@param??The?output?type?of?the?operator
?*/
@PublicEvolving
public?interface?TwoInputStreamOperator<IN1,?IN2,?OUT>?extends?StreamOperator<OUT>?{
????/**
?????*?Processes?one?element?that?arrived?on?the?first?input?of?this?two-input?operator.?This?method
?????*?is?guaranteed?to?not?be?called?concurrently?with?other?methods?of?the?operator.
?????*/
????void?processElement1(StreamRecord?element) ?throws?Exception;
????/**
?????*?Processes?one?element?that?arrived?on?the?second?input?of?this?two-input?operator.?This
?????*?method?is?guaranteed?to?not?be?called?concurrently?with?other?methods?of?the?operator.
?????*/
????void?processElement2(StreamRecord?element) ?throws?Exception;
}
TwoInputStreamOperator 接口里面定義了兩個方法,其中 processElement1 是用來處理非廣播流的數(shù)據(jù),processElement2 是用來處理廣播流的數(shù)據(jù)。
接著回到 CoBroadcastWithKeyedOperator 的 open 方法,首先會初始化 broadcastStates,用來保存 MapStateDescriptor -> BroadcastState 的映射關(guān)系,然后初始化 ReadWriteContextImpl 和 ReadOnlyContextImpl 對象,顧名思義 ReadWriteContextImpl 是既可以讀也可以寫狀態(tài),ReadOnlyContextImpl ?是只能讀狀態(tài),不能寫狀態(tài),在 open 方法里面還有一個重要的事情,就是初始化廣播狀態(tài)的實現(xiàn)類。
getBroadcastState 源碼
public??BroadcastState?getBroadcastState(
????????final?MapStateDescriptor?stateDescriptor) ?throws?StateMigrationException? {
????Preconditions.checkNotNull(stateDescriptor);
????String?name?=?Preconditions.checkNotNull(stateDescriptor.getName());
????BackendWritableBroadcastState?previous?=
????????????(BackendWritableBroadcastState)?accessedBroadcastStatesByName.get(name);
????if?(previous?!=?null)?{
????????checkStateNameAndMode(
????????????????previous.getStateMetaInfo().getName(),
????????????????name,
????????????????previous.getStateMetaInfo().getAssignmentMode(),
????????????????OperatorStateHandle.Mode.BROADCAST);
????????return?previous;
????}
????stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
????TypeSerializer?broadcastStateKeySerializer?=
????????????Preconditions.checkNotNull(stateDescriptor.getKeySerializer());
????TypeSerializer?broadcastStateValueSerializer?=
????????????Preconditions.checkNotNull(stateDescriptor.getValueSerializer());
????BackendWritableBroadcastState?broadcastState?=
????????????(BackendWritableBroadcastState)?registeredBroadcastStates.get(name);
????if?(broadcastState?==?null)?{
????????broadcastState?=
????????????????new?HeapBroadcastState<>(
????????????????????????new?RegisteredBroadcastStateBackendMetaInfo<>(
????????????????????????????????name,
????????????????????????????????OperatorStateHandle.Mode.BROADCAST,
????????????????????????????????broadcastStateKeySerializer,
????????????????????????????????broadcastStateValueSerializer));
????????registeredBroadcastStates.put(name,?broadcastState);
????}?else?{
????????//?has?restored?state;?check?compatibility?of?new?state?access
????????checkStateNameAndMode(
????????????????broadcastState.getStateMetaInfo().getName(),
????????????????name,
????????????????broadcastState.getStateMetaInfo().getAssignmentMode(),
????????????????OperatorStateHandle.Mode.BROADCAST);
????????RegisteredBroadcastStateBackendMetaInfo?restoredBroadcastStateMetaInfo?=
????????????????broadcastState.getStateMetaInfo();
????????//?check?whether?new?serializers?are?incompatible
????????TypeSerializerSchemaCompatibility?keyCompatibility?=
????????????????restoredBroadcastStateMetaInfo.updateKeySerializer(broadcastStateKeySerializer);
????????if?(keyCompatibility.isIncompatible())?{
????????????throw?new?StateMigrationException(
????????????????????"The?new?key?typeSerializer?for?broadcast?state?must?not?be?incompatible.");
????????}
????????TypeSerializerSchemaCompatibility?valueCompatibility?=
????????????????restoredBroadcastStateMetaInfo.updateValueSerializer(
????????????????????????broadcastStateValueSerializer);
????????if?(valueCompatibility.isIncompatible())?{
????????????throw?new?StateMigrationException(
????????????????????"The?new?value?typeSerializer?for?broadcast?state?must?not?be?incompatible.");
????????}
????????broadcastState.setStateMetaInfo(restoredBroadcastStateMetaInfo);
????}
????accessedBroadcastStatesByName.put(name,?broadcastState);
????return?broadcastState;
}
getBroadcastState 方法主要就是初始化 HeapBroadcastState 對象,也就是廣播狀態(tài)的具體實現(xiàn)類,再來看一下 HeapBroadcastState 源碼。
HeapBroadcastState 源碼
/**
?*?A?{@link?BroadcastState?Broadcast?State}?backed?a?heap-based?{@link?Map}.
?*
?*?@param??The?key?type?of?the?elements?in?the?{@link?BroadcastState?Broadcast?State}.
?*?@param??The?value?type?of?the?elements?in?the?{@link?BroadcastState?Broadcast?State}.
?*/
public?class?HeapBroadcastState<K,?V>?implements?BackendWritableBroadcastState<K,?V>?{
????/**?Meta?information?of?the?state,?including?state?name,?assignment?mode,?and?serializer.?*/
????private?RegisteredBroadcastStateBackendMetaInfo?stateMetaInfo;
????/**?The?internal?map?the?holds?the?elements?of?the?state.?*/
????private?final?Map?backingMap;
????/**?A?serializer?that?allows?to?perform?deep?copies?of?internal?map?state.?*/
????private?final?MapSerializer?internalMapCopySerializer;
????HeapBroadcastState(RegisteredBroadcastStateBackendMetaInfo?stateMetaInfo)?{
????????this(stateMetaInfo,?new?HashMap<>());
????}
????private?HeapBroadcastState(
????????????final?RegisteredBroadcastStateBackendMetaInfo?stateMetaInfo,
????????????final?Map?internalMap) ?{
????????this.stateMetaInfo?=?Preconditions.checkNotNull(stateMetaInfo);
????????this.backingMap?=?Preconditions.checkNotNull(internalMap);
????????this.internalMapCopySerializer?=
????????????????new?MapSerializer<>(
????????????????????????stateMetaInfo.getKeySerializer(),?stateMetaInfo.getValueSerializer());
????}
????private?HeapBroadcastState(HeapBroadcastState?toCopy) ?{
????????this(
????????????????toCopy.stateMetaInfo.deepCopy(),
????????????????toCopy.internalMapCopySerializer.copy(toCopy.backingMap));
????}
????@Override
????public?void?setStateMetaInfo(RegisteredBroadcastStateBackendMetaInfo?stateMetaInfo) ?{
????????this.stateMetaInfo?=?stateMetaInfo;
????}
????@Override
????public?RegisteredBroadcastStateBackendMetaInfo?getStateMetaInfo()? {
????????return?stateMetaInfo;
????}
????@Override
????public?HeapBroadcastState?deepCopy()? {
????????return?new?HeapBroadcastState<>(this);
????}
????@Override
????public?void?clear()?{
????????backingMap.clear();
????}
????@Override
????public?String?toString()?{
????????return?"HeapBroadcastState{"
????????????????+?"stateMetaInfo="
????????????????+?stateMetaInfo
????????????????+?",?backingMap="
????????????????+?backingMap
????????????????+?",?internalMapCopySerializer="
????????????????+?internalMapCopySerializer
????????????????+?'}';
????}
????@Override
????public?long?write(FSDataOutputStream?out)?throws?IOException?{
????????long?partitionOffset?=?out.getPos();
????????DataOutputView?dov?=?new?DataOutputViewStreamWrapper(out);
????????dov.writeInt(backingMap.size());
????????for?(Map.Entry?entry?:?backingMap.entrySet())?{
????????????getStateMetaInfo().getKeySerializer().serialize(entry.getKey(),?dov);
????????????getStateMetaInfo().getValueSerializer().serialize(entry.getValue(),?dov);
????????}
????????return?partitionOffset;
????}
????@Override
????public?V?get(K?key)?{
????????return?backingMap.get(key);
????}
????@Override
????public?void?put(K?key,?V?value)?{
????????backingMap.put(key,?value);
????}
????@Override
????public?void?putAll(Map?map) ?{
????????backingMap.putAll(map);
????}
????@Override
????public?void?remove(K?key)?{
????????backingMap.remove(key);
????}
????@Override
????public?boolean?contains(K?key)?{
????????return?backingMap.containsKey(key);
????}
????@Override
????public?Iterator>?iterator()?{
????????return?backingMap.entrySet().iterator();
????}
????@Override
????public?Iterable>?entries()?{
????????return?backingMap.entrySet();
????}
????@Override
????public?Iterable>?immutableEntries()?{
????????return?Collections.unmodifiableSet(backingMap.entrySet());
????}
}
HeapBroadcastState 的代碼比較簡單,主要是對狀態(tài)的讀寫操作,本質(zhì)就是在操作 HashMap。
接著回到 CoBroadcastWithKeyedOperator 的 processElement1 方法里用的是 ReadOnlyContextImpl,processElement2 方法里用的是 ReadWriteContextImpl,換句話說,只有在廣播側(cè)才可以修改狀態(tài),在非廣播側(cè)不能修改狀態(tài),這里對應(yīng)了上面的第二個問題。
雖然在廣播側(cè)和非廣側(cè)都可以獲取到狀態(tài),但是 getBroadcastState 方法的返回值是不一樣的。
BroadcastState & ReadOnlyBroadcastState UML 圖

BroadcastState 接口繼承了 ReadOnlyBroadcastState 接口又繼承了 State 接口,BroadcastState 接口的唯一實現(xiàn)類是 HeapBroadcastState,從名字上就能看出廣播狀態(tài)是存儲在 JVM 堆內(nèi)存上的。底層就是一個 Map,上圖中的 backingMap 就是用來保存狀態(tài)數(shù)據(jù)的,這里對應(yīng)了上面的第三個問題。
為了進(jìn)一步解釋上面的第二個問題,下面補(bǔ)充一個具體的場景來說明。
舉例說明

我們來看上圖中的場景,A 流讀取 Kafka 的數(shù)據(jù)然后經(jīng)過 keyby 返回一個 KeyedStream,B 流讀取 mysql 的數(shù)據(jù)用于廣播流返回一個 BroadcastStream,B 流有兩條數(shù)據(jù)分別是 flink,spark,然后會廣播到下游的每一個 subtask 上去,此時下游的 subtask-0,subtask-1 就擁有了廣播狀態(tài)中的 flink,spark 兩條數(shù)據(jù),這個時候往 Kafka 里寫入兩條數(shù)據(jù)分別是 flink 和 hive,經(jīng)過 keyby 操作,flink 被分配到了下游的 subtask-0 上,hive 被分配到了 subtask-1 上,很明顯 flink 這條數(shù)據(jù)可以和廣播流數(shù)據(jù)關(guān)聯(lián)上,hive 這條數(shù)據(jù)則關(guān)聯(lián)不上,此時,如果在非廣播側(cè)也就是 A 流側(cè)修改了狀態(tài),比如把 flink, hive 添加到了狀態(tài)里面,此時 subtask-0 和 subtask-1 上的廣播狀態(tài)數(shù)據(jù)就會出現(xiàn)不一致的情況,所以,為了保證 operator 的所有并發(fā)實例持有的廣播狀態(tài)的一致性,在設(shè)計的時候就禁止在非廣播側(cè)修改狀態(tài)。
總結(jié)
Broadcast State 是 Operator State 的一種特殊類型。主要是用來解決低吞吐量的流(小數(shù)據(jù)量)和另一個原始數(shù)據(jù)流關(guān)聯(lián)的場景,廣播狀態(tài)必須定義為 Map 結(jié)構(gòu),并且只能在廣播流側(cè)修改狀態(tài),非廣播流側(cè)只能獲取狀態(tài),不能修改狀態(tài)。廣播狀態(tài)只能保存在堆內(nèi)存中,所以在使用廣播狀態(tài)的時候需要給 TM 設(shè)置足夠的內(nèi)存,本文主要從源碼的角度解釋了 Flink 這么設(shè)計的原因,讓大家對廣播流狀態(tài)有了更加深入的理解。
推薦閱讀
Flink 1.14.0 全新的 Kafka Connector
Flink 1.14.0 消費 kafka 數(shù)據(jù)自定義反序列化類
Flink 通過 State Processor API 實現(xiàn)狀態(tài)的讀取和寫入
Flink Print SQL Connector 添加隨機(jī)取樣功能
如果你覺得文章對你有幫助,麻煩點一下 贊 和 在看 吧,你的支持是我創(chuàng)作的最大動力.
