Python 神器 Celery 源碼解析(2)
Celery是一款非常簡單、靈活、可靠的分布式系統(tǒng),可用于處理大量消息,并且提供了一整套操作此系統(tǒng)的工具。Celery 也是一款消息隊列工具,可用于處理實時數(shù)據(jù)以及任務調(diào)度。
幾周前,我們一起閱讀celery的源碼(點擊查看《第一篇》),學習了celery的工具之一,實現(xiàn)Promise功能的「vine庫」。這周我們一起看另外一個工具,負責AMQP協(xié)議中數(shù)據(jù)傳輸?shù)膒ython-amqp庫。它采用純python實現(xiàn)(支持cython擴展),可以通過它理解AMQP協(xié)議的細節(jié),打下celery的基礎,本文包括如下幾個部分:
py-amqp項目概述 幀機制詳解 AMQP協(xié)議幀處理 AMQP使用 AMQP模型 小結(jié) 小技巧
py-amqp項目概述
py-amqp當前版本 5.0.6 ,主要代碼如下表:
| 文件 | 描述 |
|---|---|
| abstract_channel.py | 抽象的channel |
| basic_message.py | message消息實現(xiàn) |
| channel.py | channel頻道實現(xiàn) |
| connection.py | connection連接實現(xiàn) |
| exceptions.py | 異常 |
| method_framing.py | 幀解析方法 |
| platform.py | 運行平臺適配 |
| protocol.py | 協(xié)議對象 |
| sasl.py | ssl認證相關實現(xiàn) |
| serialization.py | 序列化相關實現(xiàn) |
| spec.py | 協(xié)議規(guī)則定義 |
| transport.py | 通訊實現(xiàn) |
| util.py | 工具類 |
| *.pxd | Cython的實現(xiàn),可以加速amqp |
項目主要包括2個功能:
AMQP協(xié)議的傳輸處理,包括字節(jié)流,幀和Message的序列化/反序列化 AMQP協(xié)議的Connection,Channel,Message三個基礎模型實現(xiàn)
在正式開始之前,我們需要先簡單了解一下AMQP協(xié)議:
高級消息隊列協(xié)議即Advanced Message Queuing Protocol(AMQP)是面向消息中間件提供的開放的應用層協(xié)議,其設計目標是對于消息的排序、路由(包括點對點和訂閱-發(fā)布)、保持可靠性、保證安全性[1]。AMQP規(guī)范了消息傳遞方和接收方的行為,以使消息在不同的提供商之間實現(xiàn)互操作性,就像SMTP,HTTP,F(xiàn)TP等協(xié)議可以創(chuàng)建交互系統(tǒng)一樣。
高級消息隊列協(xié)議是一種二進制應用層協(xié)議,用于應對廣泛的面向消息應用程序的支持。協(xié)議提供了消息流控制,保證的一個消息對象的傳遞過程,如至多一次、保證多次、僅有一次等,和基于SASL和TLS的身份驗證和消息加密。
文字比較難懂,結(jié)合下圖,消息如何從生產(chǎn)者傳遞到消費者的過程,應該就可以理解AMQP:

上圖是使用RabbitMQ實現(xiàn)的,RabbitMQ是一個開源的消息中間件,最早實現(xiàn)了AMQP協(xié)議,也是celery的默認消息中間件。強烈建議對AMQP協(xié)議不熟悉的朋友先閱讀一下參考鏈接中的: 「AMQP 0-9-1 Model Explained」。我摘錄了channel和message部分內(nèi)容如下:
某些應用程序需要多個連接到代理。但是,同時保持許多 TCP 連接打開是不可取的,因為這樣做會消耗系統(tǒng)資源并且使配置防火墻更加困難。AMQP 0-9-1 連接與可以被認為是“共享單個 TCP 連接的輕量級連接”的通道復用。
客戶端執(zhí)行的每個協(xié)議操作都發(fā)生在通道上。特定通道上的通信與另一個通道上的通信完全分開,因此每個協(xié)議方法還攜帶一個通道 ID(也稱為通道號),這是一個整數(shù),代理和客戶端都使用它來確定該方法適用于哪個通道。通道僅存在于連接的上下文中,而不會單獨存在。當連接關閉時,其上的所有通道也關閉。
對于使用多個線程/進程進行處理的應用程序,為每個線程/進程打開一個新通道而不在它們之間共享通道是很常見的。
AMQP 0-9-1 模型中的消息具有屬性。有些屬性非常常見,以至于 AMQP 0-9-1 規(guī)范定義了它們,應用程序開發(fā)人員不必考慮確切的屬性名稱。一些例子是:
內(nèi)容類型 Content type 內(nèi)容編碼 Content encoding 路由鍵 Routing key 交付模式(持續(xù)與否)Delivery mode (persistent or not) 消息優(yōu)先級 Message priority 消息發(fā)布時間戳 Message publishing timestamp 有效期 Expiration period 發(fā)布者應用程序 ID Publisher application id AMQP 代理使用某些屬性,但大多數(shù)屬性都可以由接收它們的應用程序解釋。一些屬性是可選的,稱為headers。它們類似于 HTTP 中的 X-Header。消息屬性是在發(fā)布消息時設置的。
幀機制詳解
之前我介紹過Redis客戶端和服務端的通訊協(xié)議:RESP(Redis Serialization Protocol),鏈接在這里: 「Redis-py 源碼閱讀」 。當時介紹的不夠詳細,這里我嘗試通俗的介紹一下在TCP這種二進制流之上的構(gòu)建各種應用層協(xié)議的常用方法。
我們知道TCP是基于字節(jié)流的傳輸層通信協(xié)議,你可以把它想像成下圖:
+--------------------------------------------+
|????????????????????????????????????????????|
|...00010001110001101110101111001111010110...|
|????????????????????????????????????????????|
+--------------------------------------------+
這里的數(shù)據(jù)都是由0和1組成,頭和尾的省略號表示還有很多數(shù)據(jù),這么多數(shù)據(jù)從左(服務端)流向右 (客戶端)。如果沒有額外的說明,我們無法從中獲取到有效的信息。類似一篇長文沒有標點一樣,沒法讀懂,就是一堆亂碼。要解決這個問題,一般有3種辦法:
定長信息 使用特定字符分隔信息 使用數(shù)據(jù)頭指定信息長度
定長信息
定長信息,類似下圖:
+--------+--------+--------+--------+--------+
|????????|????????|????????|????????|????????|
|00100110|10000111|00111011|11010110|00001111|
|????????|????????|????????|????????|????????|
+--------+--------+--------+--------+--------+
我們約定每個信息都是8位字符長度,這樣上面的數(shù)據(jù)可以得到5段有效信息,分別是:00100110,10000111...。定長信息的缺陷很明顯,如果信息大于8位需要截斷,如果小于8位則需要補齊。
大家可以想象一下
00100110是如何補齊?方法很簡單,位數(shù)補齊在前面,所以這里是用0補齊了2位。如果是在尾部進行補齊,就無法知道末尾的0是有效數(shù)據(jù)還是補齊的數(shù)據(jù)。
我們可以使用天幕桿幫忙理解,這種工廠生產(chǎn)出來的東西,都有著一樣的長度:

使用特定字符分隔信息
也可以使用特定的間隔在數(shù)據(jù)流中區(qū)分信息,比如下圖。
+--------------------------------------------+
|????????????????????????????????????????????|
|01100110101010?101010010101?1000010110101101|
|????????????????????????????????????????????|
+--------------------------------------------+
這里使用空格 來區(qū)分上面的數(shù)據(jù),得到3段信息,分別是: 01100110101010 ...
注意僅僅為了示意方便,二進制流中沒有空格,只有0010 0000
我們可以把間隔理解成竹竿的竹節(jié),2個竹節(jié)之間就是一段。自然生長的竹節(jié),肯定是長短不一。

使用分隔符方式的缺陷在于,效率比較低下,需要挨個判斷是否分隔符。
使用數(shù)據(jù)頭指定信息長度
數(shù)據(jù)頭就是給每個消息加一個描述消息長度的頭,比如下面:
+--------------------------------------------+
|????????????????????????????????????????????|
|10110110100111010110111110101100011100011100|
|????????????????????????????????????????????|
+--------------------------------------------+
1表示后面有1位數(shù)據(jù),0表示后面沒有數(shù)據(jù),所以上面的數(shù)據(jù)前面部分翻譯出來的信息就是0110 1001,對應ASCII的小寫字母i:
1011011010011101011?#?流
?0?1??1??0??1?0?0?1?#?去除長度后的信息
上面僅僅使用0和1模擬,會顯示的有點冗余。如果使用字符,就可以按照字符位數(shù)來定義。比如:
+--------------------------------------------+
|????????????????????????????????????????????|
|30112101051111120010112113000210201211311111
|????????????????????????????????????????????|
+--------------------------------------------+
30112101051111120010112113000210201211311111
?3???2??1?5?????2??1?1?2??3???2??2??2??3???1?#?長度
?011?10?0?11111?00?0?1?11?000?10?01?11?111?1
同樣可以用生活中的燈串來理解數(shù)據(jù)頭,每個信息長度的大小,類似大小不等的燈泡,燈泡上標明了數(shù)據(jù)長度。

所謂幀,在網(wǎng)絡中就是表示一個最小單元,所以我們使用上面3種方法都可以從流中區(qū)分出各個信息,也就是幀。實際應用中基本都是第3種方法或者混用2和3。比如http協(xié)議、RESP協(xié)議是分隔+數(shù)據(jù)頭的組合,AMQP協(xié)議也可以認為是此類。
AMQP協(xié)議幀處理
流的處理
transport負責創(chuàng)建socket,并進行socket上的二進制流的讀和寫。讀的方法如下:
#?ch23-celery/py-amqp-5.0.6/amqp/transport.py
def?_read(self,?n,?initial=False,?_errnos=(errno.EAGAIN,?errno.EINTR)):
????"""Read?exactly?n?bytes?from?the?socket."""
????#?持續(xù)的讀取字節(jié)
????#?self.sock?=?socket.socket(af,?socktype,?proto)
????#?self._quick_recv?=?self.sock.recv
????recv?=?self._quick_recv
????#?字節(jié)緩存
????rbuf?=?self._read_buffer
????try:
????????while?len(rbuf)?????????????try:
????????????????#?讀取剩余字節(jié)
????????????????s?=?recv(n?-?len(rbuf))
????????????except?OSError?as?exc:
????????????????if?exc.errno?in?_errnos:
????????????????????if?initial?and?self.raise_on_initial_eintr:
????????????????????????raise?socket.timeout()
????????????????????continue
????????????????raise
????????????if?not?s:
????????????????raise?OSError('Server?unexpectedly?closed?connection')
????????????rbuf?+=?s
????except:??#?noqa
????????self._read_buffer?=?rbuf
????????raise
????#?多余的字節(jié)緩存住
????result,?self._read_buffer?=?rbuf[:n],?rbuf[n:]
????return?result
寫的方法如下:
#?ch23-celery/py-amqp-5.0.6/amqp/transport.py
def?write(self,?s):
????try:
????????#?self._write?=?self.sock.sendall?
????????self._write(s)
????except?socket.timeout:
????????raise
????except?OSError?as?exc:
????????if?exc.errno?not?in?_UNAVAIL:
????????????self.connected?=?False
????????raise
幀的處理
二進制流的讀和寫一般沒有什么特別的,重點在如何從讀取的流中解析出幀信息。下面是AMQP中幀的讀取,也在transport中,主干如下:
#?ch23-celery/py-amqp-5.0.6/amqp/transport.py
def?read_frame(self,?unpack=unpack):
????"""Parse?AMQP?frame.
????Frame?has?following?format::
????????0??????1?????????3?????????7???????????????????size+7??????size+8
????????+------+---------+---------+???+-------------+???+-----------+
????????|?type?|?channel?|??size???|???|???payload???|???|?frame-end?|
????????+------+---------+---------+???+-------------+???+-----------+
?????????octet????short?????long????????'size'?octets????????octet
????"""
????#?本地化方法,加快執(zhí)行效率
????read?=?self._read
????#?緩存buffer
????read_frame_buffer?=?bytes()
????...
????#?讀取幀頭7個字節(jié)
????frame_header?=?read(7,?True)
????read_frame_buffer?+=?frame_header
????#?解析幀頭(大端)(無符號)
????frame_type,?channel,?size?=?unpack('>BHI',?frame_header)
????#?讀取body
????payload?=?read(size)
????read_frame_buffer?+=?payload
????#?讀取尾部校驗碼
????frame_end?=?ord(read(1))
????...
????if?frame_end?==?206:
????????#?返回幀數(shù)據(jù)
????????return?frame_type,?channel,?payload
????...
AMQP的幀格式是幀頭+body+幀尾。 幀頭由1個字節(jié)的幀類型+2個自己的channelID+4個字節(jié)的body長度組成。 幀尾是1個字節(jié),正常情況下是0xce,對應的十進制就是206。 使用unpack方法從二進制中獲取到信息
Message的處理
通過read_frame方法可以得到一個數(shù)據(jù)幀,這些幀又在method_framing中被組合成業(yè)務可用的Message:
#?ch23-celery/py-amqp-5.0.6/amqp/method_framing.py
def?frame_handler(connection,?callback,
??????????????????unpack_from=unpack_from,?content_methods=_CONTENT_METHODS):
????"""Create?closure?that?reads?frames."""
????#?使用閉包讀取frame
????#?字典默認值為1
????expected_types?=?defaultdict(lambda:?1)
????partial_messages?=?{}
????
????def?on_frame(frame):
????????#?幀類型?channelID?幀內(nèi)容
????????frame_type,?channel,?buf?=?frame
????????...
????????#?幀類型僅?1,2,3,8
????????if?frame_type?not?in?(expected_types[channel],?8):
????????????raise?UnexpectedFrame(
????????????????'Received?frame?{}?while?expecting?type:?{}'.format(
????????????????????frame_type,?expected_types[channel]),
????????????)
????????????
????????elif?frame_type?==?1:
????????????#?開始幀
????????????#?讀取2個整數(shù)
????????????method_sig?=?unpack_from('>HH',?buf,?0)
????????????#?三個類型是消息的開始?content_methods=[spec.Basic.Return?spec.Basic.Deliver?spec.Basic.GetOk]
????????????if?method_sig?in?content_methods:
????????????????#?Save?what?we've?got?so?far?and?wait?for?the?content-header
????????????????#?創(chuàng)建Message并以channel為key暫存
????????????????partial_messages[channel]?=?Message(
????????????????????frame_method=method_sig,?frame_args=buf,
????????????????)
????????????????expected_types[channel]?=?2
????????????????return?False
????????????????...
????????
????????elif?frame_type?==?2:
????????????#?頭幀
????????????#?從閉包中獲取Message
????????????msg?=?partial_messages[channel]
????????????#?附加header
????????????msg.inbound_header(buf)
?????
????????????if?not?msg.ready:
????????????????#?wait?for?the?content-body
????????????????#?未就緒,繼續(xù)等待body
????????????????expected_types[channel]?=?3
????????????????return?False
????????
????????elif?frame_type?==?3:
????????????#?內(nèi)容幀
????????????#?繼續(xù)從閉包中獲取Message
????????????msg?=?partial_messages[channel]
????????????#?附加body
????????????msg.inbound_body(buf)
????????????...
????????????#?重置channel等待下一個包
????????????expected_types[channel]?=?1
????????????#?清空通道的消息
????????????partial_messages.pop(channel,?None)
????????????#?執(zhí)行message的callback函數(shù)
????????????callback(channel,?msg.frame_method,?msg.frame_args,?msg)
三個幀構(gòu)成一個Message(業(yè)務消息),分別的幀類型是開始幀1,頭幀2,內(nèi)容幀3 開始幀上有frame_method和frame_args對應消息的處理方法 頭幀上有消息的屬性,比如content_type,reply_to等,類似http頭 內(nèi)容幀上就是消息的context
寫入幀是讀取的逆過程,如下:
#?ch23-celery/py-amqp-5.0.6/amqp/method_framing.py
def?frame_writer(connection,?transport,
?????????????????pack=pack,?pack_into=pack_into,?range=range,?len=len,
?????????????????bytes=bytes,?str_to_bytes=str_to_bytes,?text_t=str):
????"""Create?closure?that?writes?frames."""
????#?輸出,也就是之前的self.sock.sendall方法
????write?=?transport.write
????buffer_store?=?Buffer(bytearray(connection.frame_max?-?8))
????def?write_frame(type_,?channel,?method_sig,?args,?content):
????????...
????????buf?=?buffer_store.buf
????????view?=?buffer_store.view
????????...
????????#?##?FAST:?pack?into?buffer?and?single?write
????????frame?=?(b''.join([pack('>HH',?*method_sig),?args])
?????????????????if?type_?==?1?else?b'')
????????framelen?=?len(frame)
????????#?第一幀
????????pack_into('>BHI%dsB'?%?framelen,?buf,?offset,
??????????????????type_,?channel,?framelen,?frame,?0xce)
????????offset?+=?8?+?framelen
????????if?body?is?not?None:
????????????frame?=?b''.join([
????????????????pack('>HHQ',?method_sig[0],?0,?len(body)),
????????????????properties,
????????????])
????????????framelen?=?len(frame)
????????????#?方法幀
????????????pack_into('>BHI%dsB'?%?framelen,?buf,?offset,
??????????????????????2,?channel,?framelen,?frame,?0xce)
????????????offset?+=?8?+?framelen
????????????bodylen?=?len(body)
????????????if?bodylen?>?0:
????????????????framelen?=?bodylen
????????????????#?內(nèi)容幀
????????????????pack_into('>BHI%dsB'?%?framelen,?buf,?offset,
??????????????????????????3,?channel,?framelen,?body,?0xce)
????????????????offset?+=?8?+?framelen
????????write(view[:offset])
????????...
寫入的時候是準備好3個幀的二進制數(shù)據(jù)buf,一次性寫入到socket
Message的序列化和反序列化,我們下一個環(huán)節(jié),數(shù)據(jù)模型部分再行介紹。
amqp使用
了解AMQP協(xié)議傳輸相關的細節(jié)后,我們還是先從使用方法進入py-amqp。生產(chǎn)者發(fā)送消息是這樣的:
import?amqp
with?amqp.Connection('broker.example.com')?as?c:
????ch?=?c.channel()
????ch.basic_publish(amqp.Message('Hello?World'),?routing_key='test')
創(chuàng)建連接,并使用上下文包裹,這樣可以自動關閉連接 從連接中創(chuàng)建channel 使用channel發(fā)送消息,至少包括消息文本和route
消費者消費消息是這樣的:
import?amqp
with?amqp.Connection('broker.example.com')?as?c:
????ch?=?c.channel()
????def?on_message(message):
????????print('Received?message?(delivery?tag:?{}):?{}'.format(message.delivery_tag,?message.body))
????ch.basic_consume(queue='test',?callback=on_message,?no_ack=True)
????while?True:
????????c.drain_events()
創(chuàng)建連接,也使用上下文包裹 一樣從連接中創(chuàng)建channel 在channel上綁定消息的處理方法 消費消息至少指定queue,queue和發(fā)送時候的route要一致。也可以設置是否ack。 持續(xù)對連接進行事件監(jiān)聽
從示例可知發(fā)送和接收都需要使用Connection和Channel,消息體都使用Message對象。不同的是發(fā)送的時候使用publish方法,接收會復雜一點需要持續(xù)監(jiān)聽事件和使用consume方法。
AMQP模型
Connection
Connection主要有AbstractChannel基類和Connection類構(gòu)成,比較奇怪的是Connection和Channel都繼承自AbstractChannel。我個人覺得這種設計并不好,雖然可以通用Channel和Connection的一些操作。
????????+-----------------+
????????|?AbstractChannel?|
????????+-^-------------^-+
??????????|?????????????|
??????+---+?????????????|
??????|?????????????????|
+-----+------+???????+--+------+
|?Connection?|???????|?Channel?|
+------------+???????+---------+
Connection的構(gòu)造函數(shù):
class?Connection(AbstractChannel):
????def?__init__(self,?host='localhost:5672',?userid='guest',?password='guest',
?????????????login_method=None,?login_response=None,
?????????????authentication=(),
?????????????virtual_host='/',?locale='en_US',?client_properties=None,
?????????????ssl=False,?connect_timeout=None,?channel_max=None,
?????????????frame_max=None,?heartbeat=0,?on_open=None,?on_blocked=None,
?????????????on_unblocked=None,?confirm_publish=False,
?????????????on_tune_ok=None,?read_timeout=None,?write_timeout=None,
?????????????socket_settings=None,?frame_handler=frame_handler,
?????????????frame_writer=frame_writer,?**kwargs):
????????self._connection_id?=?uuid.uuid4().hex
????????...
????????#?幀handler,讀取幀
????????self.frame_handler_cls?=?frame_handler
????????#?幀寫處理
????????self.frame_writer_cls?=?frame_writer
????????#?所有channel的字典
????????self.channels?=?{}
????????#?The?connection?object?itself?is?treated?as?channel?0
????????#?自己也是一個channel,ID是0,這樣可以把所有message的操作統(tǒng)一到channel上
????????super().__init__(self,?0)
????????...
connection最首要的是管理數(shù)據(jù)傳輸,由connect函數(shù)實現(xiàn):
def?connect(self,?callback=None):
????#?Let?the?transport.py?module?setup?the?actual
????#?socket?connection?to?the?broker.
????#
????if?self.connected:
????????return?callback()?if?callback?else?None
????try:
????????#?創(chuàng)建transport實例
????????self.transport?=?self.Transport(
????????????self.host,?self.connect_timeout,?self.ssl,
????????????self.read_timeout,?self.write_timeout,
????????????socket_settings=self.socket_settings,
????????)
????????self.transport.connect()
????????#?實例化讀和寫(因為讀和寫都是閉包)
????????self.on_inbound_frame?=?self.frame_handler_cls(
????????????self,?self.on_inbound_method)
????????self.frame_writer?=?self.frame_writer_cls(self,?self.transport)
????????...
????except?(OSError,?SSLError):
????????...
connection還要負責一些連接相關的系統(tǒng)功能,比如連接狀態(tài)的維護:
def?_setup_listeners(self):
????self._callbacks.update({
????????spec.Connection.Start:?self._on_start,
????????spec.Connection.OpenOk:?self._on_open_ok,
????????spec.Connection.Secure:?self._on_secure,
????????spec.Connection.Tune:?self._on_tune,
????????spec.Connection.Close:?self._on_close,
????????spec.Connection.Blocked:?self._on_blocked,
????????spec.Connection.Unblocked:?self._on_unblocked,
????????spec.Connection.CloseOk:?self._on_close_ok,
????})
ef?_on_start(self,?version_major,?version_minor,?server_properties,
??????????????????mechanisms,?locales,?argsig='FsSs'):
????...
????#?處理服務端的spec.Connection.Start消息
????#?回應spec.Connection.StartOk到服務端
????self.send_method(
????????????spec.Connection.StartOk,?argsig,
????????????(client_properties,?authentication.mechanism,
?????????????login_response,?self.locale),
????????)
????...
def?send_method(self,?sig,
????????????????format=None,?args=None,?content=None,
????????????????wait=None,?callback=None,?returns_tuple=False):
????p?=?promise()
????conn?=?self.connection
????...
????args?=?dumps(format,?args)?if?format?else?''
????try:
????????#?寫入數(shù)據(jù)
????????conn.frame_writer(1,?self.channel_id,?sig,?args,?content)
????except?StopIteration:
????????...
????#?TODO?temp:?callback?should?be?after?write_method?...?;)
????if?callback:
????????#?指向回調(diào)
????????p.then(callback)
????p()
????if?wait:
????????#?等待回應
????????return?self.wait(wait,?returns_tuple=returns_tuple)
????return?p
客戶端收到服務端發(fā)來的spec.Connection.Start消息后,回應一個spec.Connection.StartOk消息
通過connection創(chuàng)建Channel:
Channel?=?Channel
def?channel(self,?channel_id=None,?callback=None):
????"""Create?new?channel.
????Fetch?a?Channel?object?identified?by?the?numeric?channel_id,?or
????create?that?object?if?it?doesn't?already?exist.
????"""
????...
????try:
????????#?channel_id?存在則從字典中獲取
????????return?self.channels[channel_id]
????except?KeyError:
????????#?不存在則新建一個channel實例
????????channel?=?self.Channel(self,?channel_id,?on_open=callback)
????????channel.open()
????????return?channel
Channel
Channel的構(gòu)造方法如下:
class?Channel(AbstractChannel):
????
????def?__init__(self,?connection,
?????????????????channel_id=None,?auto_decode=True,?on_open=None):
????????...
????????#?新建channelID
????????channel_id?=?connection._get_free_channel_id()
????????#?指定自己的channelID
????????super().__init__(connection,?channel_id)
????????...
????????#?消息回調(diào)
????????self.callbacks?=?{}
channel也需要初始化關于channel的系統(tǒng)調(diào)用,比如spec.Basic.Delive:
def?_setup_listeners(self):
????self._callbacks.update({
????????spec.Channel.Close:?self._on_close,
????????spec.Channel.CloseOk:?self._on_close_ok,
????????spec.Channel.Flow:?self._on_flow,
????????spec.Channel.OpenOk:?self._on_open_ok,
????????spec.Basic.Cancel:?self._on_basic_cancel,
????????spec.Basic.CancelOk:?self._on_basic_cancel_ok,
????????spec.Basic.Deliver:?self._on_basic_deliver,
????????spec.Basic.Return:?self._on_basic_return,
????????spec.Basic.Ack:?self._on_basic_ack,
????????spec.Basic.Nack:?self._on_basic_nack,
????})
????
def?_on_basic_deliver(self,?consumer_tag,?delivery_tag,?redelivered,
??????????????????????exchange,?routing_key,?msg):
????msg.channel?=?self
????#?投遞信息
????msg.delivery_info?=?{
????????'consumer_tag':?consumer_tag,
????????'delivery_tag':?delivery_tag,
????????'redelivered':?redelivered,
????????'exchange':?exchange,
????????'routing_key':?routing_key,
????}
????try:
????????fun?=?self.callbacks[consumer_tag]
????except?KeyError:
????????...
????else:
????????fun(msg)
先看看消息如何投遞出去的:
def?_basic_publish(self,?msg,?exchange='',?routing_key='',
???????????????????mandatory=False,?immediate=False,?timeout=None,
???????????????????confirm_timeout=None,
???????????????????argsig='Bssbb'):
????...
????try:
????????with?self.connection.transport.having_timeout(timeout):
????????????return?self.send_method(
????????????????spec.Basic.Publish,?argsig,
????????????????(0,?exchange,?routing_key,?mandatory,?immediate),?msg
????????????)
????except?socket.timeout:
????????...
basic_publish?=?_basic_publish
send_method在前面介紹spec.Connection.StartOk時候已經(jīng)有過介紹。
消息的消費,需要先在connection保持監(jiān)聽:
def?drain_events(self,?timeout=None):
????#?read?until?message?is?ready
????#?持續(xù)讀,直到讀取到message的ready狀態(tài)
????while?not?self.blocking_read(timeout):
????????pass
def?blocking_read(self,?timeout=None):
????with?self.transport.having_timeout(timeout):
????????#?讀取幀
????????frame?=?self.transport.read_frame()
????#?處理幀
????return?self.on_inbound_frame(frame)????
def?on_inbound_method(self,?channel_id,?method_sig,?payload,?content):
????#?on_inbound_frame的callback函數(shù)
????...
????#?交由對應的channel處理
????return?self.channels[channel_id].dispatch_method(
????????method_sig,?payload,?content,
????)
channel對message處理就很簡單了,直到對應的listener,執(zhí)行l(wèi)istener
def?dispatch_method(self,?method_sig,?payload,?content):
????...
????content.body?=?content.body.decode(content.content_encoding)
????...
????amqp_method?=?self._METHODS[method_sig]
????listeners?=?[self._callbacks[method_sig]]
????one_shot?=?self._pending.pop(method_sig)
????args?=?[]
????if?amqp_method.args:
????????args,?_?=?loads(amqp_method.args,?payload,?4)
????if?amqp_method.content:
????????args.append(content)
????for?listener?in?listeners:
????????listener(*args)
????...
Message
Message繼承自GenericContent:
+----------------+
|?GenericContent?|
+-------+--------+
????????^
????????|
????????|
???+----+----+
???|?Message?|
???+---------+
兩個類都是比較簡單的數(shù)據(jù)結(jié)構(gòu):
class?Message(GenericContent):
????#?消息頭
????PROPERTIES?=?[
????????('content_type',?'s'),
????????('content_encoding',?'s'),
????????('application_headers',?'F'),
????????('delivery_mode',?'o'),
????????('priority',?'o'),
????????('correlation_id',?'s'),
????????('reply_to',?'s'),
????????('expiration',?'s'),
????????('message_id',?'s'),
????????('timestamp',?'L'),
????????('type',?'s'),
????????('user_id',?'s'),
????????('app_id',?'s'),
????????('cluster_id',?'s')
????]
????
????def?__init__(self,?body='',?children=None,?channel=None,?**properties):
????????super().__init__(**properties)
????????#:?set?by?basic_consume/basic_get
????????self.delivery_info?=?None
????????self.body?=?body
????????self.channel?=?channel
class?GenericContent:
????"""Abstract?base?class?for?AMQP?content.
????Subclasses?should?override?the?PROPERTIES?attribute.
????"""
????CLASS_ID?=?None
????PROPERTIES?=?[('dummy',?'s')]
????def?__init__(self,?frame_method=None,?frame_args=None,?**props):
????????self.frame_method?=?frame_method
????????self.frame_args?=?frame_args
????????#?消息頭
????????self.properties?=?props
????????self._pending_chunks?=?[]
????????self.body_received?=?0
????????self.body_size?=?0
????????self.ready?=?False
????def?__getattr__(self,?name):
????????#?Look?for?additional?properties?in?the?'properties'
????????#?dictionary,?and?if?present?-?the?'delivery_info'?dictionary.
????????...
????????if?name?in?self.properties:
????????????#?從properties中獲取
????????????return?self.properties[name]
????????...
前文介紹的頭幀數(shù)據(jù),是這樣反序列化到Message中的:
def?decode_properties_basic(buf,?offset):
????"""Decode?basic?properties."""
????properties?=?{}
????flags,?=?unpack_from('>H',?buf,?offset)
????offset?+=?2
????if?flags?&?0x8000:
????????slen,?=?unpack_from('>B',?buf,?offset)
????????offset?+=?1
????????properties['content_type']?=?pstr_t(buf[offset:offset?+?slen])
????????offset?+=?slen
????...
????
def?_load_properties(self,?class_id,?buf,?offset):
????...
????props,?offset?=?PROPERTY_CLASSES[class_id](buf,?offset)
????self.properties?=?props
????return?offset
????????
def?inbound_header(self,?buf,?offset=0):
????...
????self._load_properties(class_id,?buf,?offset)
????...?
與反序列化對應的序列化方法主要是_serialize_properties實現(xiàn),就不在贅述。
小結(jié)
本篇文章,我們圍繞AMQP協(xié)議,理解在TCP的流上構(gòu)建應用協(xié)議的三種方法: 定長、間隔和數(shù)據(jù)頭 ;了解AMQP協(xié)議使用frame傳輸Message的方法: 使用開始幀,頭幀和內(nèi)容幀三個幀承載一個Message;了解AMQP中三個核心的概念: Connection, Channel和Message的實現(xiàn),以及如何使用這3個概念實現(xiàn)消息發(fā)送和消費。
小技巧
channel使用下面的方法生成遞增的不重復id:
>>>?from?array?import?array
>>>?a=array('H',?range(65535,?0,?-1))
>>>?a.pop()
1
>>>?a.pop()
2
>>>
一點題外話: 之前的文章,都叫源碼閱讀,主要覺得自己寫的還不夠。但是從搜索上看,源碼解析更符合直覺,個人感覺最近的文章也有點進步,所以厚顏從本期開始都改名叫源碼解析吧。
參考鏈接
cpython文檔 https://cython.org/#about amqp0-9-1協(xié)議 https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf struct二進制數(shù)據(jù) https://docs.python.org/zh-cn/3/library/struct.html?highlight=struct#module-struct AMQP協(xié)議學習 https://zhuanlan.zhihu.com/p/147675691 AMQP 0-9-1 Model Explained https://www.rabbitmq.com/tutorials/amqp-concepts.html

還不過癮?試試它們
