<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Python 神器 Celery 源碼解析(2)

          共 17369字,需瀏覽 35分鐘

           ·

          2021-10-18 09:42

          花下貓語:今天分享的是@肖恩同學的解讀源碼系列,主角是大名鼎鼎的Celery。肖同學的系列文章在公眾號平臺里很罕見,他還曾分享過Flask、Werkzeug、SQLAlchemy、Requests、Gunicorn等知名庫的源碼解讀。如果你想在技術上更進一步的話,這些內(nèi)容是不錯的閱讀材料。因此,我誠心推薦你關注【游戲不存在】!

          Celery是一款非常簡單、靈活、可靠的分布式系統(tǒng),可用于處理大量消息,并且提供了一整套操作此系統(tǒng)的工具。Celery 也是一款消息隊列工具,可用于處理實時數(shù)據(jù)以及任務調(diào)度。

          幾周前,我們一起閱讀celery的源碼(點擊查看《第一篇》),學習了celery的工具之一,實現(xiàn)Promise功能的「vine庫」。這周我們一起看另外一個工具,負責AMQP協(xié)議中數(shù)據(jù)傳輸?shù)膒ython-amqp庫。它采用純python實現(xiàn)(支持cython擴展),可以通過它理解AMQP協(xié)議的細節(jié),打下celery的基礎,本文包括如下幾個部分:

          • py-amqp項目概述
          • 幀機制詳解
          • AMQP協(xié)議幀處理
          • AMQP使用
          • AMQP模型
          • 小結(jié)
          • 小技巧

          py-amqp項目概述

          py-amqp當前版本 5.0.6 ,主要代碼如下表:

          文件描述
          abstract_channel.py抽象的channel
          basic_message.pymessage消息實現(xiàn)
          channel.pychannel頻道實現(xiàn)
          connection.pyconnection連接實現(xiàn)
          exceptions.py異常
          method_framing.py幀解析方法
          platform.py運行平臺適配
          protocol.py協(xié)議對象
          sasl.pyssl認證相關實現(xiàn)
          serialization.py序列化相關實現(xiàn)
          spec.py協(xié)議規(guī)則定義
          transport.py通訊實現(xiàn)
          util.py工具類
          *.pxdCython的實現(xiàn),可以加速amqp

          項目主要包括2個功能:

          • AMQP協(xié)議的傳輸處理,包括字節(jié)流,幀和Message的序列化/反序列化
          • AMQP協(xié)議的Connection,Channel,Message三個基礎模型實現(xiàn)

          在正式開始之前,我們需要先簡單了解一下AMQP協(xié)議:

          高級消息隊列協(xié)議即Advanced Message Queuing Protocol(AMQP)是面向消息中間件提供的開放的應用層協(xié)議,其設計目標是對于消息的排序、路由(包括點對點和訂閱-發(fā)布)、保持可靠性、保證安全性[1]。AMQP規(guī)范了消息傳遞方和接收方的行為,以使消息在不同的提供商之間實現(xiàn)互操作性,就像SMTP,HTTP,F(xiàn)TP等協(xié)議可以創(chuàng)建交互系統(tǒng)一樣。

          高級消息隊列協(xié)議是一種二進制應用層協(xié)議,用于應對廣泛的面向消息應用程序的支持。協(xié)議提供了消息流控制,保證的一個消息對象的傳遞過程,如至多一次、保證多次、僅有一次等,和基于SASL和TLS的身份驗證和消息加密。

          文字比較難懂,結(jié)合下圖,消息如何從生產(chǎn)者傳遞到消費者的過程,應該就可以理解AMQP:

          hello-world-example-routing

          上圖是使用RabbitMQ實現(xiàn)的,RabbitMQ是一個開源的消息中間件,最早實現(xiàn)了AMQP協(xié)議,也是celery的默認消息中間件。強烈建議對AMQP協(xié)議不熟悉的朋友先閱讀一下參考鏈接中的: 「AMQP 0-9-1 Model Explained」。我摘錄了channel和message部分內(nèi)容如下:

          某些應用程序需要多個連接到代理。但是,同時保持許多 TCP 連接打開是不可取的,因為這樣做會消耗系統(tǒng)資源并且使配置防火墻更加困難。AMQP 0-9-1 連接與可以被認為是“共享單個 TCP 連接的輕量級連接”的通道復用。

          客戶端執(zhí)行的每個協(xié)議操作都發(fā)生在通道上。特定通道上的通信與另一個通道上的通信完全分開,因此每個協(xié)議方法還攜帶一個通道 ID(也稱為通道號),這是一個整數(shù),代理和客戶端都使用它來確定該方法適用于哪個通道。通道僅存在于連接的上下文中,而不會單獨存在。當連接關閉時,其上的所有通道也關閉。

          對于使用多個線程/進程進行處理的應用程序,為每個線程/進程打開一個新通道而不在它們之間共享通道是很常見的。

          AMQP 0-9-1 模型中的消息具有屬性。有些屬性非常常見,以至于 AMQP 0-9-1 規(guī)范定義了它們,應用程序開發(fā)人員不必考慮確切的屬性名稱。一些例子是:

          • 內(nèi)容類型 Content type
          • 內(nèi)容編碼 Content encoding
          • 路由鍵 Routing key
          • 交付模式(持續(xù)與否)Delivery mode (persistent or not)
          • 消息優(yōu)先級 Message priority
          • 消息發(fā)布時間戳 Message publishing timestamp
          • 有效期 Expiration period
          • 發(fā)布者應用程序 ID Publisher application id

          AMQP 代理使用某些屬性,但大多數(shù)屬性都可以由接收它們的應用程序解釋。一些屬性是可選的,稱為headers。它們類似于 HTTP 中的 X-Header。消息屬性是在發(fā)布消息時設置的。

          幀機制詳解

          之前我介紹過Redis客戶端和服務端的通訊協(xié)議:RESP(Redis Serialization Protocol),鏈接在這里: 「Redis-py 源碼閱讀」 。當時介紹的不夠詳細,這里我嘗試通俗的介紹一下在TCP這種二進制流之上的構(gòu)建各種應用層協(xié)議的常用方法。

          我們知道TCP是基于字節(jié)流的傳輸層通信協(xié)議,你可以把它想像成下圖:

          +--------------------------------------------+
          |????????????????????????????????????????????|
          |...00010001110001101110101111001111010110...|
          |????????????????????????????????????????????|
          +--------------------------------------------+

          這里的數(shù)據(jù)都是由0和1組成,頭和尾的省略號表示還有很多數(shù)據(jù),這么多數(shù)據(jù)從左(服務端)流向右 (客戶端)。如果沒有額外的說明,我們無法從中獲取到有效的信息。類似一篇長文沒有標點一樣,沒法讀懂,就是一堆亂碼。要解決這個問題,一般有3種辦法:

          • 定長信息
          • 使用特定字符分隔信息
          • 使用數(shù)據(jù)頭指定信息長度

          定長信息

          定長信息,類似下圖:

          +--------+--------+--------+--------+--------+
          |????????|????????|????????|????????|????????|
          |00100110|10000111|00111011|11010110|00001111|
          |????????|????????|????????|????????|????????|
          +--------+--------+--------+--------+--------+

          我們約定每個信息都是8位字符長度,這樣上面的數(shù)據(jù)可以得到5段有效信息,分別是:00100110,10000111...。定長信息的缺陷很明顯,如果信息大于8位需要截斷,如果小于8位則需要補齊。

          大家可以想象一下 00100110 是如何補齊?方法很簡單,位數(shù)補齊在前面,所以這里是用0補齊了2位。如果是在尾部進行補齊,就無法知道末尾的0是有效數(shù)據(jù)還是補齊的數(shù)據(jù)。

          我們可以使用天幕桿幫忙理解,這種工廠生產(chǎn)出來的東西,都有著一樣的長度:

          使用特定字符分隔信息

          也可以使用特定的間隔在數(shù)據(jù)流中區(qū)分信息,比如下圖。

          +--------------------------------------------+
          |????????????????????????????????????????????|
          |01100110101010?101010010101?1000010110101101|
          |????????????????????????????????????????????|
          +--------------------------------------------+

          這里使用空格 來區(qū)分上面的數(shù)據(jù),得到3段信息,分別是: 01100110101010 ...

          注意僅僅為了示意方便,二進制流中沒有空格,只有0010 0000

          我們可以把間隔理解成竹竿的竹節(jié),2個竹節(jié)之間就是一段。自然生長的竹節(jié),肯定是長短不一。

          使用分隔符方式的缺陷在于,效率比較低下,需要挨個判斷是否分隔符。

          使用數(shù)據(jù)頭指定信息長度

          數(shù)據(jù)頭就是給每個消息加一個描述消息長度的頭,比如下面:

          +--------------------------------------------+
          |????????????????????????????????????????????|
          |10110110100111010110111110101100011100011100|
          |????????????????????????????????????????????|
          +--------------------------------------------+

          1表示后面有1位數(shù)據(jù),0表示后面沒有數(shù)據(jù),所以上面的數(shù)據(jù)前面部分翻譯出來的信息就是0110 1001,對應ASCII的小寫字母i:

          1011011010011101011?#?流
          ?0?1??1??0??1?0?0?1?#?去除長度后的信息

          上面僅僅使用0和1模擬,會顯示的有點冗余。如果使用字符,就可以按照字符位數(shù)來定義。比如:

          +--------------------------------------------+
          |????????????????????????????????????????????|
          |30112101051111120010112113000210201211311111
          |????????????????????????????????????????????|
          +--------------------------------------------+

          30112101051111120010112113000210201211311111
          ?3???2??1?5?????2??1?1?2??3???2??2??2??3???1?#?長度
          ?011?10?0?11111?00?0?1?11?000?10?01?11?111?1

          同樣可以用生活中的燈串來理解數(shù)據(jù)頭,每個信息長度的大小,類似大小不等的燈泡,燈泡上標明了數(shù)據(jù)長度。

          所謂幀,在網(wǎng)絡中就是表示一個最小單元,所以我們使用上面3種方法都可以從流中區(qū)分出各個信息,也就是幀。實際應用中基本都是第3種方法或者混用2和3。比如http協(xié)議、RESP協(xié)議是分隔+數(shù)據(jù)頭的組合,AMQP協(xié)議也可以認為是此類。

          AMQP協(xié)議幀處理

          流的處理

          transport負責創(chuàng)建socket,并進行socket上的二進制流的讀和寫。讀的方法如下:

          #?ch23-celery/py-amqp-5.0.6/amqp/transport.py
          def?_read(self,?n,?initial=False,?_errnos=(errno.EAGAIN,?errno.EINTR)):
          ????"""Read?exactly?n?bytes?from?the?socket."""
          ????#?持續(xù)的讀取字節(jié)
          ????#?self.sock?=?socket.socket(af,?socktype,?proto)
          ????#?self._quick_recv?=?self.sock.recv
          ????recv?=?self._quick_recv
          ????#?字節(jié)緩存
          ????rbuf?=?self._read_buffer
          ????try:
          ????????while?len(rbuf)?????????????try:
          ????????????????#?讀取剩余字節(jié)
          ????????????????s?=?recv(n?-?len(rbuf))
          ????????????except?OSError?as?exc:
          ????????????????if?exc.errno?in?_errnos:
          ????????????????????if?initial?and?self.raise_on_initial_eintr:
          ????????????????????????raise?socket.timeout()
          ????????????????????continue
          ????????????????raise
          ????????????if?not?s:
          ????????????????raise?OSError('Server?unexpectedly?closed?connection')
          ????????????rbuf?+=?s
          ????except:??#?noqa
          ????????self._read_buffer?=?rbuf
          ????????raise
          ????#?多余的字節(jié)緩存住
          ????result,?self._read_buffer?=?rbuf[:n],?rbuf[n:]
          ????return?result

          寫的方法如下:

          #?ch23-celery/py-amqp-5.0.6/amqp/transport.py
          def?write(self,?s):
          ????try:
          ????????#?self._write?=?self.sock.sendall?
          ????????self._write(s)
          ????except?socket.timeout:
          ????????raise
          ????except?OSError?as?exc:
          ????????if?exc.errno?not?in?_UNAVAIL:
          ????????????self.connected?=?False
          ????????raise

          幀的處理

          二進制流的讀和寫一般沒有什么特別的,重點在如何從讀取的流中解析出幀信息。下面是AMQP中幀的讀取,也在transport中,主干如下:

          #?ch23-celery/py-amqp-5.0.6/amqp/transport.py
          def?read_frame(self,?unpack=unpack):
          ????"""Parse?AMQP?frame.

          ????Frame?has?following?format::

          ????????0??????1?????????3?????????7???????????????????size+7??????size+8
          ????????+------+---------+---------+???+-------------+???+-----------+
          ????????|?type?|?channel?|??size???|???|???payload???|???|?frame-end?|
          ????????+------+---------+---------+???+-------------+???+-----------+
          ?????????octet????short?????long????????'size'?octets????????octet

          ????"
          ""
          ????#?本地化方法,加快執(zhí)行效率
          ????read?=?self._read
          ????#?緩存buffer
          ????read_frame_buffer?=?bytes()
          ????...
          ????#?讀取幀頭7個字節(jié)
          ????frame_header?=?read(7,?True)
          ????read_frame_buffer?+=?frame_header
          ????#?解析幀頭(大端)(無符號)
          ????frame_type,?channel,?size?=?unpack('>BHI',?frame_header)
          ????#?讀取body
          ????payload?=?read(size)
          ????read_frame_buffer?+=?payload
          ????#?讀取尾部校驗碼
          ????frame_end?=?ord(read(1))
          ????...
          ????if?frame_end?==?206:
          ????????#?返回幀數(shù)據(jù)
          ????????return?frame_type,?channel,?payload
          ????...
          • AMQP的幀格式是幀頭+body+幀尾。
          • 幀頭由1個字節(jié)的幀類型+2個自己的channelID+4個字節(jié)的body長度組成。
          • 幀尾是1個字節(jié),正常情況下是0xce,對應的十進制就是206。
          • 使用unpack方法從二進制中獲取到信息

          Message的處理

          通過read_frame方法可以得到一個數(shù)據(jù)幀,這些幀又在method_framing中被組合成業(yè)務可用的Message:

          #?ch23-celery/py-amqp-5.0.6/amqp/method_framing.py
          def?frame_handler(connection,?callback,
          ??????????????????unpack_from=unpack_from,?content_methods=_CONTENT_METHODS):
          ????"""Create?closure?that?reads?frames."""
          ????#?使用閉包讀取frame
          ????#?字典默認值為1
          ????expected_types?=?defaultdict(lambda:?1)
          ????partial_messages?=?{}
          ????
          ????def?on_frame(frame):
          ????????#?幀類型?channelID?幀內(nèi)容
          ????????frame_type,?channel,?buf?=?frame
          ????????...
          ????????#?幀類型僅?1,2,3,8
          ????????if?frame_type?not?in?(expected_types[channel],?8):
          ????????????raise?UnexpectedFrame(
          ????????????????'Received?frame?{}?while?expecting?type:?{}'.format(
          ????????????????????frame_type,?expected_types[channel]),
          ????????????)
          ????????????
          ????????elif?frame_type?==?1:
          ????????????#?開始幀
          ????????????#?讀取2個整數(shù)
          ????????????method_sig?=?unpack_from('>HH',?buf,?0)
          ????????????#?三個類型是消息的開始?content_methods=[spec.Basic.Return?spec.Basic.Deliver?spec.Basic.GetOk]
          ????????????if?method_sig?in?content_methods:
          ????????????????#?Save?what?we've?got?so?far?and?wait?for?the?content-header
          ????????????????#?創(chuàng)建Message并以channel為key暫存
          ????????????????partial_messages[channel]?=?Message(
          ????????????????????frame_method=method_sig,?frame_args=buf,
          ????????????????)
          ????????????????expected_types[channel]?=?2
          ????????????????return?False
          ????????????????...
          ????????
          ????????elif?frame_type?==?2:
          ????????????#?頭幀
          ????????????#?從閉包中獲取Message
          ????????????msg?=?partial_messages[channel]
          ????????????#?附加header
          ????????????msg.inbound_header(buf)
          ?????
          ????????????if?not?msg.ready:
          ????????????????#?wait?for?the?content-body
          ????????????????#?未就緒,繼續(xù)等待body
          ????????????????expected_types[channel]?=?3
          ????????????????return?False
          ????????
          ????????elif?frame_type?==?3:
          ????????????#?內(nèi)容幀
          ????????????#?繼續(xù)從閉包中獲取Message
          ????????????msg?=?partial_messages[channel]
          ????????????#?附加body
          ????????????msg.inbound_body(buf)
          ????????????...
          ????????????#?重置channel等待下一個包
          ????????????expected_types[channel]?=?1
          ????????????#?清空通道的消息
          ????????????partial_messages.pop(channel,?None)
          ????????????#?執(zhí)行message的callback函數(shù)
          ????????????callback(channel,?msg.frame_method,?msg.frame_args,?msg)
          • 三個幀構(gòu)成一個Message(業(yè)務消息),分別的幀類型是開始幀1,頭幀2,內(nèi)容幀3
          • 開始幀上有frame_method和frame_args對應消息的處理方法
          • 頭幀上有消息的屬性,比如content_type,reply_to等,類似http頭
          • 內(nèi)容幀上就是消息的context

          寫入幀是讀取的逆過程,如下:

          #?ch23-celery/py-amqp-5.0.6/amqp/method_framing.py
          def?frame_writer(connection,?transport,
          ?????????????????pack=pack,?pack_into=pack_into,?range=range,?len=len,
          ?????????????????bytes=bytes,?str_to_bytes=str_to_bytes,?text_t=str):
          ????"""Create?closure?that?writes?frames."""
          ????#?輸出,也就是之前的self.sock.sendall方法
          ????write?=?transport.write

          ????buffer_store?=?Buffer(bytearray(connection.frame_max?-?8))
          ????def?write_frame(type_,?channel,?method_sig,?args,?content):
          ????????...
          ????????buf?=?buffer_store.buf
          ????????view?=?buffer_store.view
          ????????...
          ????????#?##?FAST:?pack?into?buffer?and?single?write
          ????????frame?=?(b''.join([pack('>HH',?*method_sig),?args])
          ?????????????????if?type_?==?1?else?b'')
          ????????framelen?=?len(frame)
          ????????#?第一幀
          ????????pack_into('>BHI%dsB'?%?framelen,?buf,?offset,
          ??????????????????type_,?channel,?framelen,?frame,?0xce)
          ????????offset?+=?8?+?framelen
          ????????if?body?is?not?None:
          ????????????frame?=?b''.join([
          ????????????????pack('>HHQ',?method_sig[0],?0,?len(body)),
          ????????????????properties,
          ????????????])
          ????????????framelen?=?len(frame)
          ????????????#?方法幀
          ????????????pack_into('>BHI%dsB'?%?framelen,?buf,?offset,
          ??????????????????????2,?channel,?framelen,?frame,?0xce)
          ????????????offset?+=?8?+?framelen

          ????????????bodylen?=?len(body)
          ????????????if?bodylen?>?0:
          ????????????????framelen?=?bodylen
          ????????????????#?內(nèi)容幀
          ????????????????pack_into('>BHI%dsB'?%?framelen,?buf,?offset,
          ??????????????????????????3,?channel,?framelen,?body,?0xce)
          ????????????????offset?+=?8?+?framelen

          ????????write(view[:offset])
          ????????...
          • 寫入的時候是準備好3個幀的二進制數(shù)據(jù)buf,一次性寫入到socket

          Message的序列化和反序列化,我們下一個環(huán)節(jié),數(shù)據(jù)模型部分再行介紹。

          amqp使用

          了解AMQP協(xié)議傳輸相關的細節(jié)后,我們還是先從使用方法進入py-amqp。生產(chǎn)者發(fā)送消息是這樣的:

          import?amqp

          with?amqp.Connection('broker.example.com')?as?c:
          ????ch?=?c.channel()
          ????ch.basic_publish(amqp.Message('Hello?World'),?routing_key='test')
          • 創(chuàng)建連接,并使用上下文包裹,這樣可以自動關閉連接
          • 從連接中創(chuàng)建channel
          • 使用channel發(fā)送消息,至少包括消息文本和route

          消費者消費消息是這樣的:

          import?amqp

          with?amqp.Connection('broker.example.com')?as?c:
          ????ch?=?c.channel()
          ????def?on_message(message):
          ????????print('Received?message?(delivery?tag:?{}):?{}'.format(message.delivery_tag,?message.body))
          ????ch.basic_consume(queue='test',?callback=on_message,?no_ack=True)
          ????while?True:
          ????????c.drain_events()
          • 創(chuàng)建連接,也使用上下文包裹
          • 一樣從連接中創(chuàng)建channel
          • 在channel上綁定消息的處理方法
          • 消費消息至少指定queue,queue和發(fā)送時候的route要一致。也可以設置是否ack。
          • 持續(xù)對連接進行事件監(jiān)聽

          從示例可知發(fā)送和接收都需要使用Connection和Channel,消息體都使用Message對象。不同的是發(fā)送的時候使用publish方法,接收會復雜一點需要持續(xù)監(jiān)聽事件和使用consume方法。

          AMQP模型

          Connection

          Connection主要有AbstractChannel基類和Connection類構(gòu)成,比較奇怪的是Connection和Channel都繼承自AbstractChannel。我個人覺得這種設計并不好,雖然可以通用Channel和Connection的一些操作。

          ????????+-----------------+
          ????????|?AbstractChannel?|
          ????????+-^-------------^-+
          ??????????|?????????????|
          ??????+---+?????????????|
          ??????|?????????????????|
          +-----+------+???????+--+------+
          |?Connection?|???????|?Channel?|
          +------------+???????+---------+

          Connection的構(gòu)造函數(shù):

          class?Connection(AbstractChannel):
          ????def?__init__(self,?host='localhost:5672',?userid='guest',?password='guest',
          ?????????????login_method=None,?login_response=None,
          ?????????????authentication=(),
          ?????????????virtual_host='/',?locale='en_US',?client_properties=None,
          ?????????????ssl=False,?connect_timeout=None,?channel_max=None,
          ?????????????frame_max=None,?heartbeat=0,?on_open=None,?on_blocked=None,
          ?????????????on_unblocked=None,?confirm_publish=False,
          ?????????????on_tune_ok=None,?read_timeout=None,?write_timeout=None,
          ?????????????socket_settings=None,?frame_handler=frame_handler,
          ?????????????frame_writer=frame_writer,?**kwargs):
          ????????self._connection_id?=?uuid.uuid4().hex
          ????????...
          ????????#?幀handler,讀取幀
          ????????self.frame_handler_cls?=?frame_handler
          ????????#?幀寫處理
          ????????self.frame_writer_cls?=?frame_writer

          ????????#?所有channel的字典
          ????????self.channels?=?{}
          ????????#?The?connection?object?itself?is?treated?as?channel?0
          ????????#?自己也是一個channel,ID是0,這樣可以把所有message的操作統(tǒng)一到channel上
          ????????super().__init__(self,?0)
          ????????...

          connection最首要的是管理數(shù)據(jù)傳輸,由connect函數(shù)實現(xiàn):

          def?connect(self,?callback=None):
          ????#?Let?the?transport.py?module?setup?the?actual
          ????#?socket?connection?to?the?broker.
          ????#
          ????if?self.connected:
          ????????return?callback()?if?callback?else?None
          ????try:
          ????????#?創(chuàng)建transport實例
          ????????self.transport?=?self.Transport(
          ????????????self.host,?self.connect_timeout,?self.ssl,
          ????????????self.read_timeout,?self.write_timeout,
          ????????????socket_settings=self.socket_settings,
          ????????)
          ????????self.transport.connect()
          ????????#?實例化讀和寫(因為讀和寫都是閉包)
          ????????self.on_inbound_frame?=?self.frame_handler_cls(
          ????????????self,?self.on_inbound_method)
          ????????self.frame_writer?=?self.frame_writer_cls(self,?self.transport)

          ????????...

          ????except?(OSError,?SSLError):
          ????????...

          connection還要負責一些連接相關的系統(tǒng)功能,比如連接狀態(tài)的維護:

          def?_setup_listeners(self):
          ????self._callbacks.update({
          ????????spec.Connection.Start:?self._on_start,
          ????????spec.Connection.OpenOk:?self._on_open_ok,
          ????????spec.Connection.Secure:?self._on_secure,
          ????????spec.Connection.Tune:?self._on_tune,
          ????????spec.Connection.Close:?self._on_close,
          ????????spec.Connection.Blocked:?self._on_blocked,
          ????????spec.Connection.Unblocked:?self._on_unblocked,
          ????????spec.Connection.CloseOk:?self._on_close_ok,
          ????})


          ef?_on_start(self,?version_major,?version_minor,?server_properties,
          ??????????????????mechanisms,?locales,?argsig='FsSs'):
          ????...
          ????#?處理服務端的spec.Connection.Start消息
          ????#?回應spec.Connection.StartOk到服務端
          ????self.send_method(
          ????????????spec.Connection.StartOk,?argsig,
          ????????????(client_properties,?authentication.mechanism,
          ?????????????login_response,?self.locale),
          ????????)
          ????...

          def?send_method(self,?sig,
          ????????????????format=None,?args=None,?content=None,
          ????????????????wait=None,?callback=None,?returns_tuple=False):
          ????p?=?promise()
          ????conn?=?self.connection
          ????...
          ????args?=?dumps(format,?args)?if?format?else?''
          ????try:
          ????????#?寫入數(shù)據(jù)
          ????????conn.frame_writer(1,?self.channel_id,?sig,?args,?content)
          ????except?StopIteration:
          ????????...

          ????#?TODO?temp:?callback?should?be?after?write_method?...?;)
          ????if?callback:
          ????????#?指向回調(diào)
          ????????p.then(callback)
          ????p()
          ????if?wait:
          ????????#?等待回應
          ????????return?self.wait(wait,?returns_tuple=returns_tuple)
          ????return?p
          • 客戶端收到服務端發(fā)來的spec.Connection.Start消息后,回應一個spec.Connection.StartOk消息

          通過connection創(chuàng)建Channel:

          Channel?=?Channel

          def?channel(self,?channel_id=None,?callback=None):
          ????"""Create?new?channel.

          ????Fetch?a?Channel?object?identified?by?the?numeric?channel_id,?or
          ????create?that?object?if?it?doesn't?already?exist.
          ????"
          ""
          ????...

          ????try:
          ????????#?channel_id?存在則從字典中獲取
          ????????return?self.channels[channel_id]
          ????except?KeyError:
          ????????#?不存在則新建一個channel實例
          ????????channel?=?self.Channel(self,?channel_id,?on_open=callback)
          ????????channel.open()
          ????????return?channel

          Channel

          Channel的構(gòu)造方法如下:

          class?Channel(AbstractChannel):
          ????
          ????def?__init__(self,?connection,
          ?????????????????channel_id=None,?auto_decode=True,?on_open=None):
          ????????...
          ????????#?新建channelID
          ????????channel_id?=?connection._get_free_channel_id()
          ????????#?指定自己的channelID
          ????????super().__init__(connection,?channel_id)
          ????????...
          ????????#?消息回調(diào)
          ????????self.callbacks?=?{}

          channel也需要初始化關于channel的系統(tǒng)調(diào)用,比如spec.Basic.Delive:

          def?_setup_listeners(self):
          ????self._callbacks.update({
          ????????spec.Channel.Close:?self._on_close,
          ????????spec.Channel.CloseOk:?self._on_close_ok,
          ????????spec.Channel.Flow:?self._on_flow,
          ????????spec.Channel.OpenOk:?self._on_open_ok,
          ????????spec.Basic.Cancel:?self._on_basic_cancel,
          ????????spec.Basic.CancelOk:?self._on_basic_cancel_ok,
          ????????spec.Basic.Deliver:?self._on_basic_deliver,
          ????????spec.Basic.Return:?self._on_basic_return,
          ????????spec.Basic.Ack:?self._on_basic_ack,
          ????????spec.Basic.Nack:?self._on_basic_nack,
          ????})
          ????
          def?_on_basic_deliver(self,?consumer_tag,?delivery_tag,?redelivered,
          ??????????????????????exchange,?routing_key,?msg):
          ????msg.channel?=?self
          ????#?投遞信息
          ????msg.delivery_info?=?{
          ????????'consumer_tag':?consumer_tag,
          ????????'delivery_tag':?delivery_tag,
          ????????'redelivered':?redelivered,
          ????????'exchange':?exchange,
          ????????'routing_key':?routing_key,
          ????}

          ????try:
          ????????fun?=?self.callbacks[consumer_tag]
          ????except?KeyError:
          ????????...
          ????else:
          ????????fun(msg)

          先看看消息如何投遞出去的:

          def?_basic_publish(self,?msg,?exchange='',?routing_key='',
          ???????????????????mandatory=False,?immediate=False,?timeout=None,
          ???????????????????confirm_timeout=None,
          ???????????????????argsig='Bssbb'):
          ????...
          ????try:
          ????????with?self.connection.transport.having_timeout(timeout):
          ????????????return?self.send_method(
          ????????????????spec.Basic.Publish,?argsig,
          ????????????????(0,?exchange,?routing_key,?mandatory,?immediate),?msg
          ????????????)
          ????except?socket.timeout:
          ????????...

          basic_publish?=?_basic_publish

          send_method在前面介紹spec.Connection.StartOk時候已經(jīng)有過介紹。

          消息的消費,需要先在connection保持監(jiān)聽:

          def?drain_events(self,?timeout=None):
          ????#?read?until?message?is?ready
          ????#?持續(xù)讀,直到讀取到message的ready狀態(tài)
          ????while?not?self.blocking_read(timeout):
          ????????pass

          def?blocking_read(self,?timeout=None):
          ????with?self.transport.having_timeout(timeout):
          ????????#?讀取幀
          ????????frame?=?self.transport.read_frame()
          ????#?處理幀
          ????return?self.on_inbound_frame(frame)????

          def?on_inbound_method(self,?channel_id,?method_sig,?payload,?content):
          ????#?on_inbound_frame的callback函數(shù)
          ????...
          ????#?交由對應的channel處理
          ????return?self.channels[channel_id].dispatch_method(
          ????????method_sig,?payload,?content,
          ????)

          channel對message處理就很簡單了,直到對應的listener,執(zhí)行l(wèi)istener

          def?dispatch_method(self,?method_sig,?payload,?content):
          ????...
          ????content.body?=?content.body.decode(content.content_encoding)
          ????...
          ????amqp_method?=?self._METHODS[method_sig]
          ????listeners?=?[self._callbacks[method_sig]]
          ????one_shot?=?self._pending.pop(method_sig)
          ????args?=?[]
          ????if?amqp_method.args:
          ????????args,?_?=?loads(amqp_method.args,?payload,?4)
          ????if?amqp_method.content:
          ????????args.append(content)

          ????for?listener?in?listeners:
          ????????listener(*args)

          ????...

          Message

          Message繼承自GenericContent:

          +----------------+
          |?GenericContent?|
          +-------+--------+
          ????????^
          ????????|
          ????????|
          ???+----+----+
          ???|?Message?|
          ???+---------+

          兩個類都是比較簡單的數(shù)據(jù)結(jié)構(gòu):

          class?Message(GenericContent):
          ????#?消息頭
          ????PROPERTIES?=?[
          ????????('content_type',?'s'),
          ????????('content_encoding',?'s'),
          ????????('application_headers',?'F'),
          ????????('delivery_mode',?'o'),
          ????????('priority',?'o'),
          ????????('correlation_id',?'s'),
          ????????('reply_to',?'s'),
          ????????('expiration',?'s'),
          ????????('message_id',?'s'),
          ????????('timestamp',?'L'),
          ????????('type',?'s'),
          ????????('user_id',?'s'),
          ????????('app_id',?'s'),
          ????????('cluster_id',?'s')
          ????]
          ????
          ????def?__init__(self,?body='',?children=None,?channel=None,?**properties):
          ????????super().__init__(**properties)
          ????????#:?set?by?basic_consume/basic_get
          ????????self.delivery_info?=?None
          ????????self.body?=?body
          ????????self.channel?=?channel

          class?GenericContent:
          ????"""Abstract?base?class?for?AMQP?content.

          ????Subclasses?should?override?the?PROPERTIES?attribute.
          ????"
          ""

          ????CLASS_ID?=?None
          ????PROPERTIES?=?[('dummy',?'s')]

          ????def?__init__(self,?frame_method=None,?frame_args=None,?**props):
          ????????self.frame_method?=?frame_method
          ????????self.frame_args?=?frame_args
          ????????#?消息頭
          ????????self.properties?=?props
          ????????self._pending_chunks?=?[]
          ????????self.body_received?=?0
          ????????self.body_size?=?0
          ????????self.ready?=?False

          ????def?__getattr__(self,?name):
          ????????#?Look?for?additional?properties?in?the?'properties'
          ????????#?dictionary,?and?if?present?-?the?'delivery_info'?dictionary.
          ????????...
          ????????if?name?in?self.properties:
          ????????????#?從properties中獲取
          ????????????return?self.properties[name]
          ????????...

          前文介紹的頭幀數(shù)據(jù),是這樣反序列化到Message中的:

          def?decode_properties_basic(buf,?offset):
          ????"""Decode?basic?properties."""
          ????properties?=?{}
          ????flags,?=?unpack_from('>H',?buf,?offset)
          ????offset?+=?2

          ????if?flags?&?0x8000:
          ????????slen,?=?unpack_from('>B',?buf,?offset)
          ????????offset?+=?1
          ????????properties['content_type']?=?pstr_t(buf[offset:offset?+?slen])
          ????????offset?+=?slen
          ????...
          ????
          def?_load_properties(self,?class_id,?buf,?offset):
          ????...
          ????props,?offset?=?PROPERTY_CLASSES[class_id](buf,?offset)
          ????self.properties?=?props
          ????return?offset
          ????????
          def?inbound_header(self,?buf,?offset=0):
          ????...
          ????self._load_properties(class_id,?buf,?offset)
          ????...?

          與反序列化對應的序列化方法主要是_serialize_properties實現(xiàn),就不在贅述。

          小結(jié)

          本篇文章,我們圍繞AMQP協(xié)議,理解在TCP的流上構(gòu)建應用協(xié)議的三種方法: 定長、間隔和數(shù)據(jù)頭 ;了解AMQP協(xié)議使用frame傳輸Message的方法: 使用開始幀,頭幀和內(nèi)容幀三個幀承載一個Message;了解AMQP中三個核心的概念: Connection, Channel和Message的實現(xiàn),以及如何使用這3個概念實現(xiàn)消息發(fā)送和消費。

          小技巧

          channel使用下面的方法生成遞增的不重復id:

          >>>?from?array?import?array
          >>>?a=array('H',?range(65535,?0,?-1))
          >>>?a.pop()
          1
          >>>?a.pop()
          2
          >>>

          一點題外話: 之前的文章,都叫源碼閱讀,主要覺得自己寫的還不夠。但是從搜索上看,源碼解析更符合直覺,個人感覺最近的文章也有點進步,所以厚顏從本期開始都改名叫源碼解析吧。

          參考鏈接

          • cpython文檔 https://cython.org/#about
          • amqp0-9-1協(xié)議 https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf
          • struct二進制數(shù)據(jù) https://docs.python.org/zh-cn/3/library/struct.html?highlight=struct#module-struct
          • AMQP協(xié)議學習 https://zhuanlan.zhihu.com/p/147675691
          • AMQP 0-9-1 Model Explained https://www.rabbitmq.com/tutorials/amqp-concepts.html
          Python貓技術交流群開放啦!群里既有國內(nèi)一二線大廠在職員工,也有國內(nèi)外高校在讀學生,既有十多年碼齡的編程老鳥,也有中小學剛剛?cè)腴T的新人,學習氛圍良好!想入群的同學,請在公號內(nèi)回復『交流群』,獲取貓哥的微信(謝絕廣告黨,非誠勿擾?。?/span>~


          還不過癮?試試它們




          王垠:編程的智慧

          如何用 Python 實現(xiàn)電腦端自動化?

          Python 3.10 有哪些最新特性 ?

          2021年,你應該知道的Python打包指南

          Python 之父為什么嫌棄 lambda 匿名函數(shù)?

          為什么爬蟲工程師應該有一些基本的后端常識?


          如果你覺得本文有幫助
          請慷慨分享點贊,感謝啦
          瀏覽 61
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  午夜福利无码电影 | 国产精品久久久久久无码牛牛章艳 | 国产成人精品视频在线 | 成人禁区| www,色婷婷 |