<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          圖文講解Kafka如何應(yīng)用NIO實(shí)現(xiàn)網(wǎng)絡(luò)通信

          共 2812字,需瀏覽 6分鐘

           ·

          2021-01-07 19:55


          ? 點(diǎn)擊上方“JavaEdge”,關(guān)注公眾號(hào)

          設(shè)為“星標(biāo)”,好文章不錯(cuò)過(guò)!


          網(wǎng)絡(luò)通信層




          Kafka網(wǎng)絡(luò)通信組成



          • SocketServer

            核心,主要實(shí)現(xiàn)了Reactor模式,用于處理外部多個(gè)Clients(廣義Clients,可能包含Producer、Consumer或其他Broker)的并發(fā)請(qǐng)求,并負(fù)責(zé)將處理結(jié)果封裝進(jìn)Response中,返還給Clients。SocketServer是Kafka網(wǎng)絡(luò)通信層中最重要的子模塊。它的Acceptor線程、Processor線程和RequestChannel等對(duì)象,都是實(shí)施網(wǎng)絡(luò)通信的重要組成部分。

          • KafkaRequestHandlerPool

            I/O線程池,里面定義了若干個(gè)I/O線程,用于執(zhí)行真實(shí)的請(qǐng)求處理邏輯。KafkaRequestHandlerPool線程池定義了多個(gè)KafkaRequestHandler線程,而KafkaRequestHandler線程是真正處理請(qǐng)求邏輯的地方。


          兩者共通處在于SocketServer中定義的RequestChannel對(duì)象和Processor線程。在代碼中,線程本質(zhì)都是Runnable類型,不管是Acceptor類、Processor還是KafkaRequestHandler類。

          相較于KafkaRequestHandler,Acceptor和Processor最多算請(qǐng)求和響應(yīng)的“搬運(yùn)工”。


          SocketServer



          • AbstractServerThread類 這是Acceptor線程和Processor線程的抽象基類

          • Acceptor線程類 接收和創(chuàng)建外部TCP連接的線程。每個(gè)SocketServer實(shí)例只會(huì)創(chuàng)建一個(gè)Acceptor線程。唯一作用創(chuàng)建連接,并將接收到的Request傳遞給下游的Processor線程。

          • Processor線程類 每個(gè)SocketServer實(shí)例默認(rèn)創(chuàng)建若干個(gè)(num.network.threads)Processor線程。負(fù)責(zé)

            • 將接收到的Request添加到RequestChannel的Request隊(duì)列

            • 將Response返還給Request發(fā)送方

          • Processor伴生對(duì)象類 僅定義一些與Processor線程相關(guān)的常見(jiàn)監(jiān)控指標(biāo)和常量等,如Processor線程空閑率等。

          • ConnectionQuotas類?

          • TooManyConnectionsException類?

          • SocketServer類 實(shí)現(xiàn)了對(duì)以上所有組件的管理和操作,如創(chuàng)建和關(guān)閉Acceptor、Processor線程。

          • SocketServer伴生對(duì)象類 定義了一些有用的常量,同時(shí)明確了SocketServer組件中的哪些參數(shù)是允許動(dòng)態(tài)修改的。




          Acceptor線程



          經(jīng)典Reactor模式的Dispatcher接收外部請(qǐng)求并分發(fā)給下面的實(shí)際處理線程。在Kafka中,這個(gè)Dispatcher就是Acceptor線程。


          參數(shù)



          endPoint

          定義的Kafka Broker連接信息,比如PLAINTEXT://localhost:9092?

          sendBufferSize

          recvBufferSize

          如果在你的生產(chǎn)環(huán)境中,Clients與Broker的通信網(wǎng)絡(luò)延遲很大(RTT>10ms),推薦增加控制緩沖區(qū)大小的兩個(gè)參數(shù):sendBufferSize和recvBufferSize,一般默認(rèn)值100KB太小了。


          Acceptor線程的自定義屬性



          • nioSelector Java NIO庫(kù)的Selector對(duì)象實(shí)例,也是后續(xù)所有網(wǎng)絡(luò)通信組件實(shí)現(xiàn)Java NIO機(jī)制的基礎(chǔ)

          • processors 網(wǎng)絡(luò)Processor線程池。Acceptor線程在初始化時(shí),需要?jiǎng)?chuàng)建對(duì)應(yīng)的網(wǎng)絡(luò)Processor線程池。Processor線程是在Acceptor線程中管理和維護(hù)的。


          Processor相關(guān)API



          • addProcessors??

          • removeProcessors?于是Acceptor類就具備Processor線程池管理功能。

          • Acceptor類的run方法 - 處理Reactor模式中分發(fā)?

          Acceptor線程會(huì)先為每個(gè)入站請(qǐng)求確定要處理它的Processor線程

          Acceptor線程使用Java NIO的Selector、SocketChannel循環(huán)輪詢就緒的I/O事件(SelectionKey.OP_ACCEPT)。一旦接收到外部連接請(qǐng)求,Acceptor就指定一個(gè)Processor線程,并將該請(qǐng)求交由它,讓它創(chuàng)建真正的網(wǎng)絡(luò)連接。




          Processor線程



          • 源碼?

          • 執(zhí)行流程?

          • 每個(gè)Processor線程在創(chuàng)建時(shí)都會(huì)創(chuàng)建3個(gè)隊(duì)列:可能是阻塞隊(duì)列,也可能是一個(gè)Map對(duì)象

          newConnections

          ?每當(dāng)Processor線程接收新連接請(qǐng)求,都會(huì)將對(duì)應(yīng)SocketChannel放入該隊(duì)列。之后調(diào)用configureNewConnections創(chuàng)建連接時(shí),就從該隊(duì)列中取出SocketChannel,然后注冊(cè)新連接。

          inflightResponses

          • 臨時(shí)Response隊(duì)列?

          • 為何是臨時(shí)?有些Response回調(diào)邏輯要在Response被返回發(fā)送方后,才能執(zhí)行,因此需要暫存臨時(shí)隊(duì)列。

          responseQueue

          每個(gè)Processor線程都會(huì)維護(hù)自己的Response隊(duì)列, 而非像網(wǎng)上的某些文章說(shuō)Response隊(duì)列是線程共享的或是保存在RequestChannel中的。Response隊(duì)列里面保存著需要被返還給發(fā)送方的所有Response對(duì)象。

          工作邏輯

          configureNewConnections

          • 負(fù)責(zé)處理新連接請(qǐng)求,注意每個(gè)Processor線程都維護(hù)著一個(gè)Selector類實(shí)例。?

          processNewResponses

          • 負(fù)責(zé)發(fā)送Response給Request發(fā)送方,并且將Response放入臨時(shí)Response隊(duì)列?

          poll

          processCompletedReceives

          • 接收和處理Request?

          processCompletedSends

          processDisconnected

          closeExcessConnections

          • 關(guān)閉超限連接?


          往期推薦


          大廠如何解決數(shù)值精度/舍入/溢出問(wèn)題

          硬核干貨:HTTP超時(shí)、重復(fù)請(qǐng)求必見(jiàn)坑點(diǎn)及解決方案

          由于不知線程池的bug,某Java程序員叕被祭天

          程序員因重復(fù)記錄日志撐爆ELK被辭退!

          擁抱Kubernetes,再見(jiàn)了Spring Cloud




          目前交流群已有?800+人,旨在促進(jìn)技術(shù)交流,可關(guān)注公眾號(hào)添加筆者微信邀請(qǐng)進(jìn)群


          喜歡文章,點(diǎn)個(gè)“在看、點(diǎn)贊、分享”素質(zhì)三連支持一下~

          瀏覽 27
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产AV毛片 | 国产精品秘 久久久久久奇米影视 | 操骚逼影院 | 欧美日韩成人一区二区在线观看 | 天天插,天天狠,天天透 |