Flink 參數(shù)配置和常見參數(shù)調(diào)優(yōu)
點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”

Flink參數(shù)配置
jobmanger.rpc.address jm的地址。
jobmanager.rpc.port jm的端口號。
jobmanager.heap.mb jm的堆內(nèi)存大小。不建議配的太大,1-2G足夠。
taskmanager.heap.mb tm的堆內(nèi)存大小。大小視任務(wù)量而定。需要存儲任務(wù)的中間值,網(wǎng)絡(luò)緩存,用戶數(shù)據(jù)等。
taskmanager.numberOfTaskSlots slot數(shù)量。在yarn模式使用的時(shí)候會受到
yarn.scheduler.maximum-allocation-vcores值的影響。此處指定的slot數(shù)量如果超過yarn的maximum-allocation-vcores,flink啟動(dòng)會報(bào)錯(cuò)。在yarn模式,flink啟動(dòng)的task manager個(gè)數(shù)可以參照如下計(jì)算公式:
num_of_tm = ceil(parallelism / slot) ?即并行度除以slot個(gè)數(shù),結(jié)果向上取整。
parallelsm.default 任務(wù)默認(rèn)并行度,如果任務(wù)未指定并行度,將采用此設(shè)置。
web.port Flink web ui的端口號。
jobmanager.archive.fs.dir 將已完成的任務(wù)歸檔存儲的目錄。
history.web.port 基于web的history server的端口號。
historyserver.archive.fs.dir history server的歸檔目錄。該配置必須包含
jobmanager.archive.fs.dir配置的目錄,以便history server能夠讀取到已完成的任務(wù)信息。historyserver.archive.fs.refresh-interval 刷新存檔作業(yè)目錄時(shí)間間隔
state.backend 存儲和檢查點(diǎn)的后臺存儲。可選值為rocksdb filesystem hdfs。
state.backend.fs.checkpointdir 檢查點(diǎn)數(shù)據(jù)文件和元數(shù)據(jù)的默認(rèn)目錄。
state.checkpoints.dir 保存檢查點(diǎn)目錄。
state.savepoints.dir save point的目錄。
state.checkpoints.num-retained 保留最近檢查點(diǎn)的數(shù)量。
state.backend.incremental 增量存儲。
akka.ask.timeout Job Manager和Task Manager通信連接的超時(shí)時(shí)間。如果網(wǎng)絡(luò)擁擠經(jīng)常出現(xiàn)超時(shí)錯(cuò)誤,可以增大該配置值。
akka.watch.heartbeat.interval 心跳發(fā)送間隔,用來檢測task manager的狀態(tài)。
akka.watch.heartbeat.pause 如果超過該時(shí)間仍未收到task manager的心跳,該task manager 會被認(rèn)為已掛掉。
taskmanager.network.memory.max 網(wǎng)絡(luò)緩沖區(qū)最大內(nèi)存大小。
taskmanager.network.memory.min 網(wǎng)絡(luò)緩沖區(qū)最小內(nèi)存大小。
taskmanager.network.memory.fraction 網(wǎng)絡(luò)緩沖區(qū)使用的內(nèi)存占據(jù)總JVM內(nèi)存的比例。如果配置了
taskmanager.network.memory.max和taskmanager.network.memory.min,本配置項(xiàng)會被覆蓋。fs.hdfs.hadoopconf hadoop配置文件路徑(已被廢棄,建議使用HADOOP_CONF_DIR環(huán)境變量)
yarn.application-attempts job失敗嘗試次數(shù),主要是指job manager的重啟嘗試次數(shù)。該值不應(yīng)該超過
yarn-site.xml中的yarn.resourcemanager.am.max-attemps的值。
Flink HA(Job Manager)的配置
high-availability: zookeeper 使用zookeeper負(fù)責(zé)HA實(shí)現(xiàn)
high-availability.zookeeper.path.root: /flink flink信息在zookeeper存儲節(jié)點(diǎn)的名稱
high-availability.zookeeper.quorum: zk1,zk2,zk3 zookeeper集群節(jié)點(diǎn)的地址和端口
high-availability.storageDir:?hdfs://nameservice/flink/ha/?job manager元數(shù)據(jù)在文件系統(tǒng)儲存的位置,zookeeper僅保存了指向該目錄的指針。
Flink metrics 監(jiān)控相關(guān)配置
metrics.reporters: prom
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9250-9260
Kafka相關(guān)調(diào)優(yōu)配置
linger.ms/batch.size 這兩個(gè)配置項(xiàng)配合使用,可以在吞吐量和延遲中得到最佳的平衡點(diǎn)。batch.size是kafka producer發(fā)送數(shù)據(jù)的批量大小,當(dāng)數(shù)據(jù)量達(dá)到batch size的時(shí)候,會將這批數(shù)據(jù)發(fā)送出去,避免了數(shù)據(jù)一條一條的發(fā)送,頻繁建立和斷開網(wǎng)絡(luò)連接。但是如果數(shù)據(jù)量比較小,導(dǎo)致遲遲不能達(dá)到batch.size,為了保證延遲不會過大,kafka不能無限等待數(shù)據(jù)量達(dá)到batch.size的時(shí)候才發(fā)送。為了解決這個(gè)問題,引入了
linger.ms配置項(xiàng)。當(dāng)數(shù)據(jù)在緩存中的時(shí)間超過linger.ms時(shí),無論緩存中數(shù)據(jù)是否達(dá)到批量大小,都會被強(qiáng)制發(fā)送出去。
ack 數(shù)據(jù)源是否需要kafka得到確認(rèn)。all表示需要收到所有ISR節(jié)點(diǎn)的確認(rèn)信息,1表示只需要收到kafka leader的確認(rèn)信息,0表示不需要任何確認(rèn)信息。該配置項(xiàng)需要對數(shù)據(jù)精準(zhǔn)性和延遲吞吐量做出權(quán)衡。
Kafka topic分區(qū)數(shù)和Flink并行度的關(guān)系
Flink kafka source的并行度需要和kafka topic的分區(qū)數(shù)一致。最大化利用kafka多分區(qū)topic的并行讀取能力。
Yarn相關(guān)調(diào)優(yōu)配置
yarn.scheduler.maximum-allocation-vcores
yarn.scheduler.minimum-allocation-vcores
Flink單個(gè)task manager的slot數(shù)量必須介于這兩個(gè)值之間
yarn.scheduler.maximum-allocation-mb
yarn.scheduler.minimum-allocation-mb
Flink的job manager 和task manager內(nèi)存不得超過container最大分配內(nèi)存大小。
yarn.nodemanager.resource.cpu-vcores yarn的虛擬CPU內(nèi)核數(shù),建議設(shè)置為物理CPU核心數(shù)的2-3倍,如果設(shè)置過少,會導(dǎo)致CPU資源無法被充分利用,跑任務(wù)的時(shí)候CPU占用率不高。
文章不錯(cuò)?點(diǎn)個(gè)【在看】吧!??



