Python 神器 Celery 源碼解析(3)

Celery是一款非常簡(jiǎn)單、靈活、可靠的分布式系統(tǒng),可用于處理大量消息,并且提供了一整套操作此系統(tǒng)的工具。Celery 也是一款消息隊(duì)列工具,可用于處理實(shí)時(shí)數(shù)據(jù)以及任務(wù)調(diào)度。
本文是是celery源碼解析的第三篇,在前兩篇里分別介紹了vine和py-amqp:
本篇我們繼續(xù)celery的基礎(chǔ)庫(kù): kombu,一個(gè)python實(shí)現(xiàn)的消息庫(kù),在celery中承擔(dān)核心的消息處理流程。本文包括下面幾個(gè)部分:
AMQP協(xié)議 kombu概述 kombu使用指南 Producer && Consumer 解析 Exchange && Queue 解析 Message 解析 Connection 解析 Matcher && serialization 小結(jié) 小技巧
AMQP 概念
接上篇,我們繼續(xù)學(xué)習(xí)AMQP的相關(guān)概念。理解這些基礎(chǔ)概念對(duì)kombu為什么這樣實(shí)現(xiàn)很有幫助。這次我們用小故事來(lái)模擬kombu的消息處理流程。
小學(xué)三年級(jí)的小明同學(xué)喜歡同桌的小紅同學(xué),喜歡她的馬尾和笑容,經(jīng)常寫小紙條給她。這里小紙條就是Message,小明同學(xué)是Producer, 小紅同學(xué)是Consumer,這種直接投遞的方式是direct。有時(shí)候,小紅同學(xué)不在座位上,小明就把紙條放在她的抽屜里。抽屜就當(dāng)做Queue使用,臨時(shí)存放投遞的消息。老師發(fā)現(xiàn)小明和小紅上課經(jīng)常有小動(dòng)作后,棒打鴛鴦把他們分開了,他們不再是同桌。小明同學(xué)沒法忘記小紅的笑容,距離產(chǎn)生了更多的美,就拜托前面的小馬幫他遞小紙條,紙條封面上寫著“請(qǐng)給小紅”。小馬就是Exchange,小馬的前座也是Exchange,“請(qǐng)給小紅”就是消息的route-key。常在河邊走,哪有不濕腳。有次紙條被老師抓住,老師讓小明同學(xué)在講臺(tái)上把紙條的內(nèi)容講給大家聽。當(dāng)眾念小紙條這叫廣播, 也就是fanout。
幼稚的小故事也是一種真實(shí)的生活,誰(shuí)又沒有寫過(guò)小紙條呢,請(qǐng)暫停回憶一分鐘:) 。業(yè)務(wù)是生活場(chǎng)景的一種抽象,代碼又是更高層一點(diǎn)的抽象。理解業(yè)務(wù),就對(duì)代碼上的概念不發(fā)楞。
以上這些概念Exchange,Queue都是broker要實(shí)現(xiàn)的內(nèi)容。可是客戶端Producer/Consumer也包含,這是為什么呢?消息傳輸過(guò)程可不可以簡(jiǎn)化成一個(gè)客戶端只使用producer發(fā)送消息,另外一個(gè)客戶端只使用consumer消費(fèi)消息呢?這樣也不是不行,前提是AMQP協(xié)議中exchange和queue的創(chuàng)建及綁定,需要使用管理工具在broker先創(chuàng)建好,這無(wú)疑約束了AMPQ使用的靈活性。kombu中包含了Exchange,Queue模型,主要是用來(lái)對(duì)broker的管理。
kombu概述
kombu是植物家族的重要一員, 芹菜(celery)、葡萄藤(vine)、海帶(kombu)是快樂的一家人。我們解析kombu,采用的版本是?5.0.0, 主要模塊如下:
| 模塊 | 功能 |
|---|---|
| abstract.py | 抽象的綁定實(shí)現(xiàn),對(duì)象是否可以綁定到channel |
| compression.py | 壓縮算法的匯總 |
| connection.py | broker的連接 |
| entity.py | 實(shí)體類,包括Exchange,binding和Queue對(duì)象的實(shí)現(xiàn) |
| matcher.py | 匹配策略 |
| message.py | 消息對(duì)象,并且附帶消息的操作接口ack,reject等 |
| messaging.py | 消息處理,包括Producer和Consumer |
| mixins.py,pools.py,simple.py | 增強(qiáng)功能或者提升便捷使用的封裝 |
| serialization.py | 序列化算法的匯總 |
| transport | 對(duì)接各種存儲(chǔ)引擎的數(shù)據(jù)傳輸實(shí)現(xiàn),主要有內(nèi)存,redis,pyamqp(RabbitMQ) 等 |
| asynchronous | 異步實(shí)現(xiàn) |
kombu底層使用pyamqp提供的AMQP協(xié)議支持,并完成Producer,Consumer,Exchange,Queue等模型實(shí)現(xiàn)。
kombu 使用指南
老規(guī)矩,先從kombu的使用開始。下面是一個(gè)生產(chǎn)者發(fā)送消息的示例:
#?kombu-5.0.0/examples/complete_send.py
from?kombu?import?Connection,?Producer,?Exchange,?Queue
exchange?=?Exchange('kombu_demo',?type='direct')
with?Connection('amqp://guest:guest@localhost:5672//')?as?connection:
????producer?=?Producer(connection)
????#?消息需要使用exchange
????producer.publish({'hello':?'world'},
?????????????????????exchange=exchange,
?????????????????????routing_key='kombu_demo',
?????????????????????serializer='json',?compression='zlib')
生產(chǎn)者示例包括下面幾步:
創(chuàng)建名為kombu_demo的exchange 創(chuàng)建到broker的connection并使用其作為上下文 使用connection創(chuàng)建發(fā)送消息的producer 使用創(chuàng)建完成的producer發(fā)送普通的json消息到創(chuàng)建好的exchange,并且指明routing_key為kombu_demo。約定消息使用json序列化,zlib算法壓縮。
消費(fèi)者的示例會(huì)略微復(fù)雜一點(diǎn):
kombu-5.0.0/examples/complete_receive.py
from?pprint?import?pformat
from?kombu?import?Connection,?Exchange,?Queue,?Consumer,?eventloop
exchange?=?Exchange('kombu_demo',?type='direct')
queue?=?Queue('kombu_demo',?exchange,?routing_key='kombu_demo')
#?格式化函數(shù)
def?pretty(obj):
????return?pformat(obj,?indent=4)
#:?This?is?the?callback?applied?when?a?message?is?received.
def?handle_message(body,?message):
????print(f'Received?message:?{body!r}')
????print('??properties:\n{}'.format(pretty(message.properties)))
????print('??delivery_info:\n{}'.format(pretty(message.delivery_info)))
????message.ack()
with?Connection('amqp://guest:guest@localhost:5672//')?as?connection:
????with?Consumer(connection,?queue,?callbacks=[handle_message]):
????????for?_?in?eventloop(connection):
????????????pass
消費(fèi)者示例主要包括下面幾步:
同樣創(chuàng)建名為kombu_demo的exchange 創(chuàng)建名為kombu_demo的queue, 綁定到exchange,并且設(shè)置消費(fèi)的routing_key 創(chuàng)建callback函數(shù),接收body和message。body是純粹的業(yè)務(wù)信息,message則包含一些投遞信息,并且可以使用message直接執(zhí)行ack回應(yīng)給broker。 和生產(chǎn)者一樣,創(chuàng)建到broker的connection并使用其作為上下文 使用connection創(chuàng)建消費(fèi)者,消費(fèi)者需要綁定到queue,并且設(shè)置callback函數(shù) 持續(xù)監(jiān)聽connection上的事件循環(huán)
我們?cè)倩仡^看看下圖,對(duì)比一下示例,加強(qiáng)理解:

示例中的生產(chǎn)者位于圖的左半?yún)^(qū),消費(fèi)者位于圖的右半?yún)^(qū)。中間部分的broker,在文章的第一篇里,我們使用redis服務(wù)作為broker。示例還有重要的一點(diǎn)就是,全程沒有創(chuàng)建channel,都是自動(dòng)創(chuàng)建的。一般情況下,我們有3個(gè)進(jìn)程,Producer進(jìn)程和Consumer進(jìn)程通過(guò)Broker進(jìn)程進(jìn)行消息的處理,這是一個(gè)典型的分布式系統(tǒng)。
Producer && Consumer 解析
Proudcer解析
Proudcer的構(gòu)造函數(shù):
class?Producer:
????def?__init__(self,?channel,?exchange=None,?routing_key=None,
?????????????????serializer=None,?auto_declare=None,?compression=None,
?????????????????on_return=None):
????????self._channel?=?channel
????????self.exchange?=?exchange
????????self.routing_key?=?routing_key?or?self.routing_key
????????self.serializer?=?serializer?or?self.serializer
????????self.compression?=?compression?or?self.compression
????????self.on_return?=?on_return?or?self.on_return
????????self._channel_promise?=?None
????????if?self.exchange?is?None:
????????????#?默認(rèn)的exchange
????????????self.exchange?=?Exchange('')
????????...
????????if?self._channel:
????????????self.revive(self._channel)
????
????def?revive(self,?channel):
????????"""Revive?the?producer?after?connection?loss."""
????????if?is_connection(channel):
????????????connection?=?channel
????????????self.__connection__?=?connection
????????????channel?=?ChannelPromise(lambda:?connection.default_channel)
????????if?isinstance(channel,?ChannelPromise):
????????????self._channel?=?channel
????????????self.exchange?=?self.exchange(channel)
????????else:
????????????#?Channel?already?concrete
????????????self._channel?=?channel
????????????if?self.on_return:
????????????????self._channel.events['basic_return'].add(self.on_return)
????????????self.exchange?=?self.exchange(channel)
Producer除了設(shè)置自身的屬性外,還包括對(duì)channel的處理。前文介紹過(guò)connection也是channel的一種,這里要先處理好connection,然后再?gòu)腸onnection獲得默認(rèn)的channel。同時(shí)對(duì)于已經(jīng)成功的channel,則進(jìn)行將producer綁定到channel。self.exchange(channel)?等同于?self.exchange.__call__(channel)。producer創(chuàng)建完成后,可以通過(guò)publish方法發(fā)送消息:
def?publish(self,?body,?routing_key=None,?delivery_mode=None,
????????????????mandatory=False,?immediate=False,?priority=0,
????????????????content_type=None,?content_encoding=None,?serializer=None,
????????????????headers=None,?compression=None,?exchange=None,?retry=False,
????????????????retry_policy=None,?declare=None,?expiration=None,?timeout=None,
????????????????**properties):
????#?初始化routing-key,?exchange
????routing_key?=?self.routing_key?if?routing_key?is?None?else?routing_key
????exchange_name,?properties['delivery_mode']?=?self._delivery_details(
????????????exchange?or?self.exchange,?delivery_mode,
????????)
????#?準(zhǔn)備body和body類型,編碼
????body,?content_type,?content_encoding?=?self._prepare(
????????????body,?serializer,?content_type,?content_encoding,
????????????compression,?headers)
????
????#?使用message封裝body
????message?=?self.channel.prepare_message(
????????body,?priority,?content_type,
????????content_encoding,?headers,?properties,
????)
????...
????#?利用channel發(fā)送消息
????return?channel.basic_publish(
????????message,
????????exchange=exchange,?routing_key=routing_key,
????????mandatory=mandatory,?immediate=immediate,
????????timeout=timeout
????)
Producer是對(duì)channel的業(yè)務(wù)封裝,創(chuàng)建時(shí)候有channel則使用channel,沒有channel則使用connection的default_channel。Producer發(fā)送消息的過(guò)程,完成exchange和message包裝后,使用channel進(jìn)行發(fā)送。
Consumer解析
Consumer的構(gòu)造函數(shù)和上下文:
class?Consumer:
????
????def?__init__(self,?channel,?queues=None,?no_ack=None,?auto_declare=None,
?????????????????callbacks=None,?on_decode_error=None,?on_message=None,
?????????????????accept=None,?prefetch_count=None,?tag_prefix=None):
????????self.channel?=?channel
????????#?Queue的列表
????????self.queues?=?maybe_list(queues?or?[])
????????self.no_ack?=?self.no_ack?if?no_ack?is?None?else?no_ack
????????#?消息的回調(diào)函數(shù)
????????self.callbacks?=?(self.callbacks?or?[]?if?callbacks?is?None
??????????????????????????else?callbacks)
????????#?自定義的消息處理方法
????????self.on_message?=?on_message
????????self.tag_prefix?=?tag_prefix
????????self._active_tags?=?{}
????????...
????????if?self.channel:
????????????self.revive(self.channel)
????
????def?revive(self,?channel):
????????"""Revive?consumer?after?connection?loss."""
????????self._active_tags.clear()
????????channel?=?self.channel?=?maybe_channel(channel)
????????#?modify?dict?size?while?iterating?over?it?is?not?allowed
????????for?qname,?queue?in?list(self._queues.items()):
????????????#?name?may?have?changed?after?declare
????????????self._queues.pop(qname,?None)
????????????queue?=?self._queues[queue.name]?=?queue(self.channel)
????????????#?queue和channel綁定
????????????queue.revive(channel)
????????...
????
????def?__enter__(self):
????????self.consume()
????????return?self
Consumer和Producer類似,設(shè)置完屬性后也要處理好channel,不同的是其中的queue(在producer中是exchange)和channel綁定并提供一個(gè)上下文環(huán)境。在上下文環(huán)境中進(jìn)行消息消費(fèi):
def?consume(self,?no_ack=None):
????tag?=?self._add_tag(queue,?consumer_tag)
????#?每個(gè)queue消息消息
????for?queue?in?self._queues:
????????queue.consume(tag,?self._receive_callback,
??????????????????????????no_ack=no_ack,?nowait=nowait)
def?_receive_callback(self,?message):
????accept?=?self.accept
????on_m,?channel,?decoded?=?self.on_message,?self.channel,?None
????try:
????????...
????????#?消息反序列化
????????decoded?=?None?if?on_m?else?message.decode()
????except?Exception?as?exc:
????????if?not?self.on_decode_error:
????????????raise
????????self.on_decode_error(message,?exc)
????else:
????????return?on_m(message)?if?on_m?else?self.receive(decoded,?message)
def?receive(self,?body,?message):
????"""Method?called?when?a?message?is?received.
????This?dispatches?to?the?registered?:attr:`callbacks`.
????Arguments:
????????body?(Any):?The?decoded?message?body.
????????message?(~kombu.Message):?The?message?instance.
????Raises:
????????NotImplementedError:?If?no?consumer?callbacks?have?been
????????????registered.
????"""
????#?執(zhí)行callback
????callbacks?=?self.callbacks
????...
????#?默認(rèn)就是body和message回傳給業(yè)務(wù)函數(shù)
????[callback(body,?message)?for?callback?in?callbacks]
consumer可以使用多個(gè)queue,每個(gè)queue消費(fèi)消息的時(shí)候可以使用覆蓋處理函數(shù)或者使用系統(tǒng)的處理函數(shù)。一般情況下callback會(huì)獲得到解碼后的body和消息原文。如何持續(xù)的消費(fèi)消息,在connection部分再介紹。
Exchange && Queue 解析
producer需要使用exchange,consumer需要使用queue,消息是通過(guò)exchange和queue搭橋傳遞的。Exchange和Queue有共同的父類MaybeChannelBound:
??????????????+-------------------+
??????????????|?MaybeChannelBound?|
??????????????+-------^-----------+
??????????????????????|
?????+----------------+----------------+
?????|?????????????????????????????????|
+----+-----+???????????????????????+---+---+
|?Exchange?|???????????????????????|?Queue?|
+----------+???????????????????????+-------+
MaybeChannelBound約定了類對(duì)channel的綁定行為:
class?MaybeChannelBound(Object):
????
????_channel?=?None
????_is_bound?=?False
????
????def?__call__(self,?channel):
????????"""`self(channel)?->?self.bind(channel)`."""
????????return?self.bind(channel)
_channel 和 _is_bound 都是類屬性,可以知道channel在類上重用 __call__魔法函數(shù)讓類方法, 比如exchange(channel)和queue(channel)執(zhí)行的時(shí)候會(huì)自動(dòng)執(zhí)行綁定到channel的動(dòng)作。
下面綁定channel的動(dòng)作和是否綁定的判斷也可以驗(yàn)證這一點(diǎn)。
def?maybe_bind(self,?channel):
????"""Bind?instance?to?channel?if?not?already?bound."""
????if?not?self.is_bound?and?channel:
????????self._channel?=?maybe_channel(channel)
????????self.when_bound()
????????self._is_bound?=?True
????return?self
@property
def?is_bound(self):
????"""Flag?set?if?the?channel?is?bound."""
????return?self._is_bound?and?self._channel?is?not?None
exchange對(duì)象的創(chuàng)建和綁定到channel:
class?Exchange(MaybeChannelBound):
????def?__init__(self,?name='',?type='',?channel=None,?**kwargs):
????????super().__init__(**kwargs)
????????self.name?=?name?or?self.name
????????self.type?=?type?or?self.type
????????self.maybe_bind(channel)
????????...
創(chuàng)建完成的exchange對(duì)象需要進(jìn)行申明,申明的過(guò)程就是讓broker創(chuàng)建exchange的過(guò)程:
def?declare(self,?nowait=False,?passive=None,?channel=None):
????"""Declare?the?exchange.
????Creates?the?exchange?on?the?broker,?unless?passive?is?set
????in?which?case?it?will?only?assert?that?the?exchange?exists.
????Argument:
????????nowait?(bool):?If?set?the?server?will?not?respond,?and?a
????????????response?will?not?be?waited?for.?Default?is?:const:`False`.
????"""
????if?self._can_declare():
????????passive?=?self.passive?if?passive?is?None?else?passive
????????#?依托于channel
????????return?(channel?or?self.channel).exchange_declare(
????????????exchange=self.name,?type=self.type,?durable=self.durable,
????????????auto_delete=self.auto_delete,?arguments=self.arguments,
????????????nowait=nowait,?passive=passive,
????????)
queue對(duì)象創(chuàng)建完成后也需要綁定到channel:
class?Queue(MaybeChannelBound):
????def?__init__(self,?name='',?exchange=None,?routing_key='',
?????????????????channel=None,?bindings=None,?on_declared=None,
?????????????????**kwargs):
????????super().__init__(**kwargs)
????????self.name?=?name?or?self.name
????????self.maybe_bind(channel)
????????...
然后申明queue,這個(gè)過(guò)程包括下面3個(gè)步驟:
def?declare(self,?nowait=False,?channel=None):
????"""Declare?queue?and?exchange?then?binds?queue?to?exchange."""
????if?not?self.no_declare:
????????#?-?declare?main?binding.
????????self._create_exchange(nowait=nowait,?channel=channel)
????????self._create_queue(nowait=nowait,?channel=channel)
????????self._create_bindings(nowait=nowait,?channel=channel)
????return?self.name
def?_create_exchange(self,?nowait=False,?channel=None):
????if?self.exchange:
????????#?隱式申明exchange
????????self.exchange.declare(nowait=nowait,?channel=channel)
def?_create_queue(self,?nowait=False,?channel=None):
????#?申明queue
????self.queue_declare(nowait=nowait,?passive=False,?channel=channel)
????if?self.exchange?and?self.exchange.name:
????????#?綁定queue和exchange
????????self.queue_bind(nowait=nowait,?channel=channel)
def?_create_bindings(self,?nowait=False,?channel=None):
????for?B?in?self.bindings:
????????channel?=?channel?or?self.channel
????????B.declare(channel)
????????B.bind(self,?nowait=nowait,?channel=channel)
queue的申明也是讓broker創(chuàng)建queue:
def?queue_declare(self,?nowait=False,?passive=False,?channel=None):
????...
????ret?=?channel.queue_declare(
????????????queue=self.name,
????????????passive=passive,
????????????durable=self.durable,
????????????exclusive=self.exclusive,
????????????auto_delete=self.auto_delete,
????????????arguments=queue_arguments,
????????????nowait=nowait,
????????)
????...
queue比exchange多一個(gè)步驟就是bind到exchange。queue_bind的工作是讓broker創(chuàng)建queue和exchange的關(guān)聯(lián)關(guān)系。
def?queue_bind(self,?nowait=False,?channel=None):
????"""Create?the?queue?binding?on?the?server."""
????return?(channel?or?self.channel).queue_bind(
????????queue=self.name,
????????exchange=exchange,
????????routing_key=routing_key,
????????arguments=arguments,
????????nowait=nowait,
????)
從Exchange和Queue的實(shí)現(xiàn),我們可以知道生產(chǎn)者不用關(guān)心消費(fèi)者的實(shí)現(xiàn),只需要?jiǎng)?chuàng)建和申明exchange即可。消費(fèi)者則是需要知道生產(chǎn)者,除了創(chuàng)建和申明queue后,還需要綁定queue和exchange的關(guān)系。又因?yàn)橄M(fèi)者和生產(chǎn)者在不同的進(jìn)程,即使生成者創(chuàng)建了exchange,消費(fèi)者也需要在本地隱式創(chuàng)建exchange對(duì)象。
Message 解析
消息對(duì)象,除了純粹的數(shù)據(jù)結(jié)構(gòu)外,也包含channel的引用,畢竟消息可以直接執(zhí)行ack動(dòng)作:
class?Message:
????def?__init__(self,?body=None,?delivery_tag=None,
?????????????????content_type=None,?content_encoding=None,?delivery_info=None,
?????????????????properties=None,?headers=None,?postencode=None,
?????????????????accept=None,?channel=None,?**kwargs):
????????#?通道,主要的API來(lái)源
????????self.channel?=?channel
????????#?投遞標(biāo)簽,可以用來(lái)響應(yīng)
????????self.delivery_tag?=?delivery_tag
????????...
????????self.headers?=?headers?or?{}
????????self.body?=?body
????????...
????????self._state?=?'RECEIVED'
消息本身還帶有四個(gè)狀態(tài):
RECEIVED?默認(rèn)狀態(tài)ACK?完成ack響應(yīng)REJECTED?拒絕消息REQUEUED?重新投遞消息
其中?{'ACK', 'REJECTED', 'REQUEUED'}?三個(gè)狀態(tài)的轉(zhuǎn)換都需要使用channel進(jìn)行操作broker,成功后再切換:
def?ack(self,?multiple=False):
????#?回應(yīng)ACK
????self.channel.basic_ack(self.delivery_tag,?multiple=multiple)
????self._state?=?'ACK'
def?reject(self,?requeue=False):
????#?拒絕(拋棄消息)
????self.channel.basic_reject(self.delivery_tag,?requeue=requeue)
????self._state?=?'REJECTED'
def?requeue(self):
????#?拒絕(退回消息)(和reject區(qū)別在requeue=True)
????self.channel.basic_reject(self.delivery_tag,?requeue=True)
????self._state?=?'REQUEUED'
消息上附帶的信息,通過(guò)不同的load方法進(jìn)行序列化:
from?.serialization?import?loads
@property
def?payload(self):
????return?loads(self.body,?self.content_type,
?????????????????????self.content_encoding,?accept=self.accept)????
Connection 解析
Connection負(fù)責(zé)管理producer/consumer到broker的網(wǎng)絡(luò)連接:
class?Connection:
????def?__init__(self,?hostname='localhost',?userid=None,
?????????????????password=None,?virtual_host=None,?port=None,?insist=False,
?????????????????ssl=False,?transport=None,?connect_timeout=5,
?????????????????transport_options=None,?login_method=None,?uri_prefix=None,
?????????????????heartbeat=0,?failover_strategy='round-robin',
?????????????????alternates=None,?**kwargs):
????????...
????????params?=?self._initial_params?=?{
????????????'hostname':?hostname,?'userid':?userid,
????????????'password':?password,?'virtual_host':?virtual_host,
????????????'port':?port,?'insist':?insist,?'ssl':?ssl,
????????????'transport':?transport,?'connect_timeout':?connect_timeout,
????????????'login_method':?login_method,?'heartbeat':?heartbeat
????????}
????????...
????????
????????self._init_params(**params)
????????...
重點(diǎn)在_init_params中對(duì)各種支持AQMP協(xié)議的broker的管理, 比如redis,RobbitMQ:
def?_init_params(self,?hostname,?userid,?password,?virtual_host,?port,
?????????????????insist,?ssl,?transport,?connect_timeout,
?????????????????login_method,?heartbeat):
????transport?=?transport?or?'amqp'
????if?transport?==?'amqp'?and?supports_librabbitmq():
????????transport?=?'librabbitmq'
????if?transport?==?'rediss'?and?ssl_available?and?not?ssl:
????????logger.warning(
????????????'Secure?redis?scheme?specified?(rediss)?with?no?ssl?'
????????????'options,?defaulting?to?insecure?SSL?behaviour.'
????????)
????????ssl?=?{'ssl_cert_reqs':?CERT_NONE}
????self.hostname?=?hostname
????self.userid?=?userid
????self.password?=?password
????self.login_method?=?login_method
????#?虛擬主機(jī)隔離
????self.virtual_host?=?virtual_host?or?self.virtual_host
????self.port?=?port?or?self.port
????self.insist?=?insist
????self.connect_timeout?=?connect_timeout
????self.ssl?=?ssl
????#?傳輸類
????self.transport_cls?=?transport
????self.heartbeat?=?heartbeat?and?float(heartbeat)
配置完connection信息后,就需要?jiǎng)?chuàng)建網(wǎng)絡(luò)連接。這個(gè)過(guò)程通過(guò)調(diào)用connection屬性或者default_channel屬性時(shí)候自動(dòng)創(chuàng)建:
@property
def?connection(self):
????"""The?underlying?connection?object.
????Warning:
????????This?instance?is?transport?specific,?so?do?not
????????depend?on?the?interface?of?this?object.
????"""
????if?not?self._closed:
????????if?not?self.connected:
????????????#?創(chuàng)建連接
????????????return?self._ensure_connection(
????????????????max_retries=1,?reraise_as_library_errors=False
????????????)
????????return?self._connection
????????
@property
def?default_channel(self):
????"""Default?channel.
????Created?upon?access?and?closed?when?the?connection?is?closed.
????Note:
????????Can?be?used?for?automatic?channel?handling?when?you?only?need?one
????????channel,?and?also?it?is?the?channel?implicitly?used?if
????????a?connection?is?passed?instead?of?a?channel,?to?functions?that
????????require?a?channel.
????"""
????#?make?sure?we're?still?connected,?and?if?not?refresh.
????conn_opts?=?self._extract_failover_opts()
????#?創(chuàng)建連接
????self._ensure_connection(**conn_opts)
????if?self._default_channel?is?None:
????????self._default_channel?=?self.channel()
????return?self._default_channel
連接創(chuàng)建完成后,繼續(xù)創(chuàng)建channel:
def?channel(self):
????"""Create?and?return?a?new?channel."""
????self._debug('create?channel')
????chan?=?self.transport.create_channel(self.connection)
????return?chan
def?create_transport(self):
????#?創(chuàng)建傳輸連接
????return?self.get_transport_cls()(client=self)
def?get_transport_cls(self):
????"""Get?the?currently?used?transport?class."""
????transport_cls?=?self.transport_cls
????if?not?transport_cls?or?isinstance(transport_cls,?str):
????????transport_cls?=?get_transport_cls(transport_cls)
????return?transport_cls
創(chuàng)建broker的連接過(guò)程,是通過(guò)transport的創(chuàng)建,其中細(xì)節(jié)涉及對(duì)不同類型的broker服務(wù)的適配,內(nèi)容挺多,我們下一章再進(jìn)行解析。
Matcher && serialization
Matcher負(fù)責(zé)處理消息的匹配機(jī)制,serialization復(fù)雜消息的序列化。兩者的實(shí)現(xiàn)方式類似,都使用注冊(cè)中心模式+策略模式實(shí)現(xiàn)。
Matcher的注冊(cè)中心:
class?MatcherRegistry:
????"""Pattern?matching?function?registry."""
????"""匹配器的注冊(cè)中心"""
????MatcherNotInstalled?=?MatcherNotInstalled
????matcher_pattern_first?=?["pcre",?]
????def?__init__(self):
????????self._matchers?=?{}
????????self._default_matcher?=?None
#:?Global?registry?of?matchers.
registry?=?MatcherRegistry()
注冊(cè)glob(模糊)模式和pcre(正則)模式兩種策略:
def?register_glob():
????"""Register?glob?into?default?registry."""
????"""使用glob(通配符)匹配"""
????registry.register('glob',?fnmatch)
def?register_pcre():
????"""Register?pcre?into?default?registry."""
????"""使用正則匹配"""
????registry.register('pcre',?rematch)
#?Register?the?base?matching?methods.
register_glob()
register_pcre()
匹配消息的方法,就是使用模式進(jìn)行識(shí)別:
def?match(self,?data,?pattern,?matcher=None,?matcher_kwargs=None):
????"""Call?the?matcher."""
????if?matcher?and?not?self._matchers.get(matcher):
????????raise?self.MatcherNotInstalled(
????????????f'No?matcher?installed?for?{matcher}'
????????)
????#?默認(rèn)使用通配符匹配
????match_func?=?self._matchers[matcher?or?'glob']
????#?通配符和正則匹配的傳參先后順序有差異
????if?matcher?in?self.matcher_pattern_first:
????????first_arg?=?bytes_to_str(pattern)
????????second_arg?=?bytes_to_str(data)
????else:
????????first_arg?=?bytes_to_str(data)
????????second_arg?=?bytes_to_str(pattern)
????return?match_func(first_arg,?second_arg,?**matcher_kwargs?or?{})
Serializer的注冊(cè)中心:
class?SerializerRegistry:
????"""The?registry?keeps?track?of?serialization?methods."""
????"""序列化方法的注冊(cè)中心"""
????def?__init__(self):
????????self._encoders?=?{}
????????self._decoders?=?{}
????????self._default_encode?=?None
????????self._default_content_type?=?None
????????self._default_content_encoding?=?None
????????#?記錄禁用的編解碼類型
????????self._disabled_content_types?=?set()
????????#?雙向字典,可以進(jìn)行互查
????????self.type_to_name?=?{}
????????self.name_to_type?=?{}
#?全局單例,并且導(dǎo)出函數(shù)綁定,使用API更簡(jiǎn)介
registry?=?SerializerRegistry()
dumps?=?registry.dumps
loads?=?registry.loads
register?=?registry.register
unregister?=?registry.unregister
json, yaml, pickle和msgpack四種序列化策略的注冊(cè):
def?register_json():
????"""Register?a?encoder/decoder?for?JSON?serialization."""
????from?kombu.utils?import?json?as?_json
????registry.register('json',?_json.dumps,?_json.loads,
??????????????????????content_type='application/json',
??????????????????????content_encoding='utf-8')
def?register_yaml():
????"""Register?a?encoder/decoder?for?YAML?serialization.
????It?is?slower?than?JSON,?but?allows?for?more?data?types
????to?be?serialized.?Useful?if?you?need?to?send?data?such?as?dates
????"""
????import?yaml
????registry.register('yaml',?yaml.safe_dump,?yaml.safe_load,
??????????????????????content_type='application/x-yaml',
??????????????????????content_encoding='utf-8')
def?register_pickle():
????"""Register?pickle?serializer.
????The?fastest?serialization?method,?but?restricts
????you?to?python?clients.
????"""
????def?pickle_dumps(obj,?dumper=pickle.dumps):
????????return?dumper(obj,?protocol=pickle_protocol)
????registry.register('pickle',?pickle_dumps,?unpickle,
??????????????????????content_type='application/x-python-serialize',
??????????????????????content_encoding='binary')
def?register_msgpack():
????"""Register?msgpack?serializer.
????See?Also:
????????https://msgpack.org/.
????"""
????pack?=?unpack?=?None
????import?msgpack
????from?msgpack?import?packb,?unpackb
????def?pack(s):
????????return?packb(s,?use_bin_type=True)
????def?unpack(s):
????????return?unpackb(s,?raw=False)
????????
????registry.register(
????????'msgpack',?pack,?unpack,
????????content_type='application/x-msgpack',
????????content_encoding='binary',
????)
register_json()
register_pickle()
register_yaml()
register_msgpack()
反序列化的使用:
#?kombu-5.0.0/kombu/serialization.py:285
#?導(dǎo)出策略
loads?=?registry.loads
#?kombu-5.0.0/kombu/message.py:10
from?.serialization?import?loads
class?Message:
????def?_decode(self):
????????#?使用策略反序列化message-body
????????return?loads(self.body,?self.content_type,
?????????????????????self.content_encoding,?accept=self.accept)
小結(jié)
通過(guò)kombu的Producer可以發(fā)送消息到broker,使用Comsumer則可以消費(fèi)消息。發(fā)送消息的時(shí)候需要使用Exchange,用來(lái)將消費(fèi)分發(fā)到不同的目標(biāo)Queue;消費(fèi)消息的時(shí)候,需要使用Queue,Queue還需要通過(guò)綁定的方式和Exchange關(guān)聯(lián)起來(lái)。Exchange和Queue都是使用底層的channel進(jìn)行數(shù)據(jù)傳輸,所以需要進(jìn)綁定(binding);還需要在遠(yuǎn)程的broker中創(chuàng)建,所以創(chuàng)建后的的Exchange和Queue需要進(jìn)行申明(declare)。消息會(huì)附帶上投遞信息,進(jìn)行序列化后從生產(chǎn)者到broker轉(zhuǎn)發(fā)給消費(fèi)者,消費(fèi)者再使用投遞信息上的序列化約定,將消息反序列成業(yè)務(wù)信息。
小技巧
pickle打包函數(shù)
pickle不僅支持?jǐn)?shù)據(jù)接口的序列化,還支持函數(shù)的序列化:
python3
Python?3.8.5?(v3.8.5:580fbb018f,?Jul?20?2020,?12:11:27)
[Clang?6.0?(clang-600.0.57)]?on?darwin
Type?"help",?"copyright",?"credits"?or?"license"?for?more?information.
>>>?import?pickle
>>>
>>>?def?hello(msg):
...?????print("hello",?msg)
...
>>>?p?=?pickle.dumps(hello)
>>>?p
b'\x80\x04\x95\x16\x00\x00\x00\x00\x00\x00\x00\x8c\x08__main__\x94\x8c\x05hello\x94\x93\x94.'
>>>
>>>?q?=?pickle.loads(p)
>>>
>>>?q("python")
hello?python
>>>
上面的hello函數(shù)可以通過(guò)pickle打包,再重新解包執(zhí)行。利用這個(gè)機(jī)制使用kombu,可以將producer進(jìn)程的函數(shù)發(fā)送到consumer進(jìn)程遠(yuǎn)程執(zhí)行。pickle支持的數(shù)據(jù)類型還挺豐富,官方文檔中介紹包括下面多種類型:
The?following?types?can?be?pickled:
*?None,?True,?and?False
*?integers,?floating?point?numbers,?complex?numbers
*?strings,?bytes,?bytearrays
*?tuples,?lists,?sets,?and?dictionaries?containing?only?picklable?objects
*?functions?defined?at?the?top?level?of?a?module?(using?def,?not?lambda)
*?built-in?functions?defined?at?the?top?level?of?a?module
*?classes?that?are?defined?at?the?top?level?of?a?module
*?instances?of?such?classes?whose?__dict__?or?the?result?of?calling?__getstate__()?is?picklable?(see?section?Pickling?Class?Instances?for?details).
配置類的簡(jiǎn)化
Object提供了一種快速構(gòu)建對(duì)象的方法:
class?Object:
????"""Common?base?class.
????Supports?automatic?kwargs->attributes?handling,?and?cloning.
????"""
????attrs?=?()
????def?__init__(self,?*args,?**kwargs):
????????#?attrs?在子類中定義
????????for?name,?type_?in?self.attrs:
????????????value?=?kwargs.get(name)
????????????#?從字典參數(shù)給屬性動(dòng)態(tài)賦值
????????????if?value?is?not?None:
????????????????setattr(self,?name,?(type_?or?_any)(value))
????????????else:
????????????????try:
????????????????????getattr(self,?name)
????????????????except?AttributeError:
????????????????????setattr(self,?name,?None)
Queue展示了這種方式的示例,比如max_length屬性:
class?Queue(MaybeChannelBound):
????attrs?=?(
????????..
????????('max_length',?int),
????????...
????)
????def?__init__(self,?name='',?exchange=None,?routing_key='',
?????????????????channel=None,?bindings=None,?on_declared=None,
?????????????????**kwargs):
????????self.name?=?name?or?self.name
????????...
????
????def?queue_declare(self,?nowait=False,?passive=False,?channel=None):
????????...
????????queue_arguments?=?channel.prepare_queue_arguments(
????????????self.queue_arguments?or?{},
????????????expires=self.expires,
????????????message_ttl=self.message_ttl,
????????????max_length=self.max_length,
????????????max_length_bytes=self.max_length_bytes,
????????????max_priority=self.max_priority,
????????)
????????...
在Queue的構(gòu)造函數(shù)中并沒有定義max_length屬性,但是queue_declare中卻可以直接使用這個(gè)屬性,可以對(duì)比name屬性感受一下差異。這對(duì)我們簡(jiǎn)化定義屬性很多的對(duì)象有幫助,比如一些配置類。
使用count提供自增ID
itertools.count提供了一種通過(guò)迭代器生成遞增ID的方法:
>>>?from?itertools?import?count
>>>
>>>?for?i?in?count():
...?????if?i?%?10?==?0:
...?????????????print(i)
...?????if?i>50:
...?????????????break
...
0
10
20
30
40
50
參考鏈接
https://github.com/celery/kombu Talking to RabbitMQ with Python and Kombu https://medium.com/python-pandemonium/talking-to-rabbitmq-with-python-and-kombu-6cbee93b1298 一篇文章講透徹了AMQP協(xié)議 https://jishuin.proginn.com/p/763bfbd2a068

還不過(guò)癮?試試它們
▲Python進(jìn)階:自定義對(duì)象實(shí)現(xiàn)切片功能
▲為什么 Python 3 ?把 print 改為函數(shù)?
