圖文講解Kafka如何應(yīng)用NIO實(shí)現(xiàn)網(wǎng)絡(luò)通信
? 點(diǎn)擊上方“JavaEdge”,關(guān)注公眾號(hào)

網(wǎng)絡(luò)通信層

Kafka網(wǎng)絡(luò)通信組成

SocketServer
核心,主要實(shí)現(xiàn)了Reactor模式,用于處理外部多個(gè)Clients(廣義Clients,可能包含Producer、Consumer或其他Broker)的并發(fā)請(qǐng)求,并負(fù)責(zé)將處理結(jié)果封裝進(jìn)Response中,返還給Clients。SocketServer是Kafka網(wǎng)絡(luò)通信層中最重要的子模塊。它的Acceptor線程、Processor線程和RequestChannel等對(duì)象,都是實(shí)施網(wǎng)絡(luò)通信的重要組成部分。
KafkaRequestHandlerPool
I/O線程池,里面定義了若干個(gè)I/O線程,用于執(zhí)行真實(shí)的請(qǐng)求處理邏輯。KafkaRequestHandlerPool線程池定義了多個(gè)KafkaRequestHandler線程,而KafkaRequestHandler線程是真正處理請(qǐng)求邏輯的地方。
兩者共通處在于SocketServer中定義的RequestChannel對(duì)象和Processor線程。在代碼中,線程本質(zhì)都是Runnable類型,不管是Acceptor類、Processor還是KafkaRequestHandler類。
相較于KafkaRequestHandler,Acceptor和Processor最多算請(qǐng)求和響應(yīng)的“搬運(yùn)工”。

SocketServer


AbstractServerThread類 這是Acceptor線程和Processor線程的抽象基類
Acceptor線程類 接收和創(chuàng)建外部TCP連接的線程。每個(gè)SocketServer實(shí)例只會(huì)創(chuàng)建一個(gè)Acceptor線程。唯一作用創(chuàng)建連接,并將接收到的Request傳遞給下游的Processor線程。
Processor線程類 每個(gè)SocketServer實(shí)例默認(rèn)創(chuàng)建若干個(gè)(num.network.threads)Processor線程。負(fù)責(zé)
將接收到的Request添加到RequestChannel的Request隊(duì)列
將Response返還給Request發(fā)送方
Processor伴生對(duì)象類 僅定義一些與Processor線程相關(guān)的常見(jiàn)監(jiān)控指標(biāo)和常量等,如Processor線程空閑率等。
ConnectionQuotas類?

TooManyConnectionsException類?

SocketServer類 實(shí)現(xiàn)了對(duì)以上所有組件的管理和操作,如創(chuàng)建和關(guān)閉Acceptor、Processor線程。
SocketServer伴生對(duì)象類 定義了一些有用的常量,同時(shí)明確了SocketServer組件中的哪些參數(shù)是允許動(dòng)態(tài)修改的。
Acceptor線程

經(jīng)典Reactor模式的Dispatcher接收外部請(qǐng)求并分發(fā)給下面的實(shí)際處理線程。在Kafka中,這個(gè)Dispatcher就是Acceptor線程。

參數(shù)

endPoint
定義的Kafka Broker連接信息,比如PLAINTEXT://localhost:9092?
sendBufferSize

recvBufferSize

如果在你的生產(chǎn)環(huán)境中,Clients與Broker的通信網(wǎng)絡(luò)延遲很大(RTT>10ms),推薦增加控制緩沖區(qū)大小的兩個(gè)參數(shù):sendBufferSize和recvBufferSize,一般默認(rèn)值100KB太小了。

Acceptor線程的自定義屬性

nioSelector Java NIO庫(kù)的Selector對(duì)象實(shí)例,也是后續(xù)所有網(wǎng)絡(luò)通信組件實(shí)現(xiàn)Java NIO機(jī)制的基礎(chǔ)
processors 網(wǎng)絡(luò)Processor線程池。Acceptor線程在初始化時(shí),需要?jiǎng)?chuàng)建對(duì)應(yīng)的網(wǎng)絡(luò)Processor線程池。Processor線程是在Acceptor線程中管理和維護(hù)的。

Processor相關(guān)API

addProcessors?
?
removeProcessors?
于是Acceptor類就具備Processor線程池管理功能。Acceptor類的run方法 - 處理Reactor模式中分發(fā)?

Acceptor線程會(huì)先為每個(gè)入站請(qǐng)求確定要處理它的Processor線程
Acceptor線程使用Java NIO的Selector、SocketChannel循環(huán)輪詢就緒的I/O事件(SelectionKey.OP_ACCEPT)。一旦接收到外部連接請(qǐng)求,Acceptor就指定一個(gè)Processor線程,并將該請(qǐng)求交由它,讓它創(chuàng)建真正的網(wǎng)絡(luò)連接。
Processor線程

源碼?

執(zhí)行流程?

每個(gè)Processor線程在創(chuàng)建時(shí)都會(huì)創(chuàng)建3個(gè)隊(duì)列:可能是阻塞隊(duì)列,也可能是一個(gè)Map對(duì)象
newConnections
?每當(dāng)Processor線程接收新連接請(qǐng)求,都會(huì)將對(duì)應(yīng)SocketChannel放入該隊(duì)列。之后調(diào)用configureNewConnections創(chuàng)建連接時(shí),就從該隊(duì)列中取出SocketChannel,然后注冊(cè)新連接。
inflightResponses
臨時(shí)Response隊(duì)列?

為何是臨時(shí)?有些Response回調(diào)邏輯要在Response被返回發(fā)送方后,才能執(zhí)行,因此需要暫存臨時(shí)隊(duì)列。
responseQueue
每個(gè)Processor線程都會(huì)維護(hù)自己的Response隊(duì)列, 而非像網(wǎng)上的某些文章說(shuō)Response隊(duì)列是線程共享的或是保存在RequestChannel中的。Response隊(duì)列里面保存著需要被返還給發(fā)送方的所有Response對(duì)象。
工作邏輯
configureNewConnections
負(fù)責(zé)處理新連接請(qǐng)求,注意每個(gè)Processor線程都維護(hù)著一個(gè)Selector類實(shí)例。?

processNewResponses
負(fù)責(zé)發(fā)送Response給Request發(fā)送方,并且將Response放入臨時(shí)Response隊(duì)列?

poll

processCompletedReceives
接收和處理Request?

processCompletedSends

processDisconnected

closeExcessConnections
關(guān)閉超限連接?

往期推薦

目前交流群已有?800+人,旨在促進(jìn)技術(shù)交流,可關(guān)注公眾號(hào)添加筆者微信邀請(qǐng)進(jìn)群
喜歡文章,點(diǎn)個(gè)“在看、點(diǎn)贊、分享”素質(zhì)三連支持一下~
