<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Python 神器 Celery 源碼解析(3)

          共 26238字,需瀏覽 53分鐘

           ·

          2021-10-30 18:33

          △點(diǎn)擊上方“Python貓”關(guān)注 ,回復(fù)“1”領(lǐng)取電子書

          Celery是一款非常簡(jiǎn)單、靈活、可靠的分布式系統(tǒng),可用于處理大量消息,并且提供了一整套操作此系統(tǒng)的工具。Celery 也是一款消息隊(duì)列工具,可用于處理實(shí)時(shí)數(shù)據(jù)以及任務(wù)調(diào)度。

          本文是是celery源碼解析的第三篇,在前兩篇里分別介紹了vine和py-amqp:

          1. 神器 celery 源碼解析- vine實(shí)現(xiàn)Promise功能
          2. 神器 celery 源碼解析- py-amqp實(shí)現(xiàn)AMQP協(xié)議

          本篇我們繼續(xù)celery的基礎(chǔ)庫(kù): kombu,一個(gè)python實(shí)現(xiàn)的消息庫(kù),在celery中承擔(dān)核心的消息處理流程。本文包括下面幾個(gè)部分:

          • AMQP協(xié)議
          • kombu概述
          • kombu使用指南
          • Producer && Consumer 解析
          • Exchange && Queue 解析
          • Message 解析
          • Connection 解析
          • Matcher && serialization
          • 小結(jié)
          • 小技巧

          AMQP 概念

          接上篇,我們繼續(xù)學(xué)習(xí)AMQP的相關(guān)概念。理解這些基礎(chǔ)概念對(duì)kombu為什么這樣實(shí)現(xiàn)很有幫助。這次我們用小故事來(lái)模擬kombu的消息處理流程。

          小學(xué)三年級(jí)的小明同學(xué)喜歡同桌的小紅同學(xué),喜歡她的馬尾和笑容,經(jīng)常寫小紙條給她。這里小紙條就是Message,小明同學(xué)是Producer, 小紅同學(xué)是Consumer,這種直接投遞的方式是direct。有時(shí)候,小紅同學(xué)不在座位上,小明就把紙條放在她的抽屜里。抽屜就當(dāng)做Queue使用,臨時(shí)存放投遞的消息。老師發(fā)現(xiàn)小明和小紅上課經(jīng)常有小動(dòng)作后,棒打鴛鴦把他們分開了,他們不再是同桌。小明同學(xué)沒法忘記小紅的笑容,距離產(chǎn)生了更多的美,就拜托前面的小馬幫他遞小紙條,紙條封面上寫著“請(qǐng)給小紅”。小馬就是Exchange,小馬的前座也是Exchange,“請(qǐng)給小紅”就是消息的route-key。常在河邊走,哪有不濕腳。有次紙條被老師抓住,老師讓小明同學(xué)在講臺(tái)上把紙條的內(nèi)容講給大家聽。當(dāng)眾念小紙條這叫廣播, 也就是fanout。

          幼稚的小故事也是一種真實(shí)的生活,誰(shuí)又沒有寫過(guò)小紙條呢,請(qǐng)暫停回憶一分鐘:) 。業(yè)務(wù)是生活場(chǎng)景的一種抽象,代碼又是更高層一點(diǎn)的抽象。理解業(yè)務(wù),就對(duì)代碼上的概念不發(fā)楞。

          以上這些概念Exchange,Queue都是broker要實(shí)現(xiàn)的內(nèi)容。可是客戶端Producer/Consumer也包含,這是為什么呢?消息傳輸過(guò)程可不可以簡(jiǎn)化成一個(gè)客戶端只使用producer發(fā)送消息,另外一個(gè)客戶端只使用consumer消費(fèi)消息呢?這樣也不是不行,前提是AMQP協(xié)議中exchange和queue的創(chuàng)建及綁定,需要使用管理工具在broker先創(chuàng)建好,這無(wú)疑約束了AMPQ使用的靈活性。kombu中包含了Exchange,Queue模型,主要是用來(lái)對(duì)broker的管理。

          kombu概述

          kombu是植物家族的重要一員, 芹菜(celery)、葡萄藤(vine)、海帶(kombu)是快樂的一家人。我們解析kombu,采用的版本是?5.0.0, 主要模塊如下:

          模塊功能
          abstract.py抽象的綁定實(shí)現(xiàn),對(duì)象是否可以綁定到channel
          compression.py壓縮算法的匯總
          connection.pybroker的連接
          entity.py實(shí)體類,包括Exchange,binding和Queue對(duì)象的實(shí)現(xiàn)
          matcher.py匹配策略
          message.py消息對(duì)象,并且附帶消息的操作接口ack,reject等
          messaging.py消息處理,包括Producer和Consumer
          mixins.py,pools.py,simple.py增強(qiáng)功能或者提升便捷使用的封裝
          serialization.py序列化算法的匯總
          transport對(duì)接各種存儲(chǔ)引擎的數(shù)據(jù)傳輸實(shí)現(xiàn),主要有內(nèi)存,redis,pyamqp(RabbitMQ) 等
          asynchronous異步實(shí)現(xiàn)

          kombu底層使用pyamqp提供的AMQP協(xié)議支持,并完成Producer,Consumer,Exchange,Queue等模型實(shí)現(xiàn)。

          kombu 使用指南

          老規(guī)矩,先從kombu的使用開始。下面是一個(gè)生產(chǎn)者發(fā)送消息的示例:

          #?kombu-5.0.0/examples/complete_send.py

          from?kombu?import?Connection,?Producer,?Exchange,?Queue

          exchange?=?Exchange('kombu_demo',?type='direct')

          with?Connection('amqp://guest:guest@localhost:5672//')?as?connection:

          ????producer?=?Producer(connection)
          ????#?消息需要使用exchange
          ????producer.publish({'hello':?'world'},
          ?????????????????????exchange=exchange,
          ?????????????????????routing_key='kombu_demo',
          ?????????????????????serializer='json',?compression='zlib')

          生產(chǎn)者示例包括下面幾步:

          • 創(chuàng)建名為kombu_demo的exchange
          • 創(chuàng)建到broker的connection并使用其作為上下文
          • 使用connection創(chuàng)建發(fā)送消息的producer
          • 使用創(chuàng)建完成的producer發(fā)送普通的json消息到創(chuàng)建好的exchange,并且指明routing_key為kombu_demo。約定消息使用json序列化,zlib算法壓縮。

          消費(fèi)者的示例會(huì)略微復(fù)雜一點(diǎn):

          kombu-5.0.0/examples/complete_receive.py

          from?pprint?import?pformat

          from?kombu?import?Connection,?Exchange,?Queue,?Consumer,?eventloop

          exchange?=?Exchange('kombu_demo',?type='direct')
          queue?=?Queue('kombu_demo',?exchange,?routing_key='kombu_demo')

          #?格式化函數(shù)
          def?pretty(obj):
          ????return?pformat(obj,?indent=4)

          #:?This?is?the?callback?applied?when?a?message?is?received.
          def?handle_message(body,?message):
          ????print(f'Received?message:?{body!r}')
          ????print('??properties:\n{}'.format(pretty(message.properties)))
          ????print('??delivery_info:\n{}'.format(pretty(message.delivery_info)))
          ????message.ack()

          with?Connection('amqp://guest:guest@localhost:5672//')?as?connection:

          ????with?Consumer(connection,?queue,?callbacks=[handle_message]):

          ????????for?_?in?eventloop(connection):
          ????????????pass

          消費(fèi)者示例主要包括下面幾步:

          • 同樣創(chuàng)建名為kombu_demo的exchange
          • 創(chuàng)建名為kombu_demo的queue, 綁定到exchange,并且設(shè)置消費(fèi)的routing_key
          • 創(chuàng)建callback函數(shù),接收body和message。body是純粹的業(yè)務(wù)信息,message則包含一些投遞信息,并且可以使用message直接執(zhí)行ack回應(yīng)給broker。
          • 和生產(chǎn)者一樣,創(chuàng)建到broker的connection并使用其作為上下文
          • 使用connection創(chuàng)建消費(fèi)者,消費(fèi)者需要綁定到queue,并且設(shè)置callback函數(shù)
          • 持續(xù)監(jiān)聽connection上的事件循環(huán)

          我們?cè)倩仡^看看下圖,對(duì)比一下示例,加強(qiáng)理解:

          hello-world-example-routing

          示例中的生產(chǎn)者位于圖的左半?yún)^(qū),消費(fèi)者位于圖的右半?yún)^(qū)。中間部分的broker,在文章的第一篇里,我們使用redis服務(wù)作為broker。示例還有重要的一點(diǎn)就是,全程沒有創(chuàng)建channel,都是自動(dòng)創(chuàng)建的。一般情況下,我們有3個(gè)進(jìn)程,Producer進(jìn)程和Consumer進(jìn)程通過(guò)Broker進(jìn)程進(jìn)行消息的處理,這是一個(gè)典型的分布式系統(tǒng)。

          Producer && Consumer 解析

          Proudcer解析

          Proudcer的構(gòu)造函數(shù):

          class?Producer:
          ????def?__init__(self,?channel,?exchange=None,?routing_key=None,
          ?????????????????serializer=None,?auto_declare=None,?compression=None,
          ?????????????????on_return=None):
          ????????self._channel?=?channel
          ????????self.exchange?=?exchange
          ????????self.routing_key?=?routing_key?or?self.routing_key
          ????????self.serializer?=?serializer?or?self.serializer
          ????????self.compression?=?compression?or?self.compression
          ????????self.on_return?=?on_return?or?self.on_return
          ????????self._channel_promise?=?None
          ????????if?self.exchange?is?None:
          ????????????#?默認(rèn)的exchange
          ????????????self.exchange?=?Exchange('')
          ????????...

          ????????if?self._channel:
          ????????????self.revive(self._channel)
          ????
          ????def?revive(self,?channel):
          ????????"""Revive?the?producer?after?connection?loss."""
          ????????if?is_connection(channel):
          ????????????connection?=?channel
          ????????????self.__connection__?=?connection
          ????????????channel?=?ChannelPromise(lambda:?connection.default_channel)
          ????????if?isinstance(channel,?ChannelPromise):
          ????????????self._channel?=?channel
          ????????????self.exchange?=?self.exchange(channel)
          ????????else:
          ????????????#?Channel?already?concrete
          ????????????self._channel?=?channel
          ????????????if?self.on_return:
          ????????????????self._channel.events['basic_return'].add(self.on_return)
          ????????????self.exchange?=?self.exchange(channel)

          Producer除了設(shè)置自身的屬性外,還包括對(duì)channel的處理。前文介紹過(guò)connection也是channel的一種,這里要先處理好connection,然后再?gòu)腸onnection獲得默認(rèn)的channel。同時(shí)對(duì)于已經(jīng)成功的channel,則進(jìn)行將producer綁定到channel。self.exchange(channel)?等同于?self.exchange.__call__(channel)。producer創(chuàng)建完成后,可以通過(guò)publish方法發(fā)送消息:

          def?publish(self,?body,?routing_key=None,?delivery_mode=None,
          ????????????????mandatory=False,?immediate=False,?priority=0,
          ????????????????content_type=None,?content_encoding=None,?serializer=None,
          ????????????????headers=None,?compression=None,?exchange=None,?retry=False,
          ????????????????retry_policy=None,?declare=None,?expiration=None,?timeout=None,
          ????????????????**properties):
          ????#?初始化routing-key,?exchange
          ????routing_key?=?self.routing_key?if?routing_key?is?None?else?routing_key
          ????exchange_name,?properties['delivery_mode']?=?self._delivery_details(
          ????????????exchange?or?self.exchange,?delivery_mode,
          ????????)
          ????#?準(zhǔn)備body和body類型,編碼
          ????body,?content_type,?content_encoding?=?self._prepare(
          ????????????body,?serializer,?content_type,?content_encoding,
          ????????????compression,?headers)
          ????
          ????#?使用message封裝body
          ????message?=?self.channel.prepare_message(
          ????????body,?priority,?content_type,
          ????????content_encoding,?headers,?properties,
          ????)
          ????...
          ????#?利用channel發(fā)送消息
          ????return?channel.basic_publish(
          ????????message,
          ????????exchange=exchange,?routing_key=routing_key,
          ????????mandatory=mandatory,?immediate=immediate,
          ????????timeout=timeout
          ????)

          Producer是對(duì)channel的業(yè)務(wù)封裝,創(chuàng)建時(shí)候有channel則使用channel,沒有channel則使用connection的default_channel。Producer發(fā)送消息的過(guò)程,完成exchange和message包裝后,使用channel進(jìn)行發(fā)送。

          Consumer解析

          Consumer的構(gòu)造函數(shù)和上下文:

          class?Consumer:
          ????
          ????def?__init__(self,?channel,?queues=None,?no_ack=None,?auto_declare=None,
          ?????????????????callbacks=None,?on_decode_error=None,?on_message=None,
          ?????????????????accept=None,?prefetch_count=None,?tag_prefix=None):
          ????????self.channel?=?channel
          ????????#?Queue的列表
          ????????self.queues?=?maybe_list(queues?or?[])
          ????????self.no_ack?=?self.no_ack?if?no_ack?is?None?else?no_ack
          ????????#?消息的回調(diào)函數(shù)
          ????????self.callbacks?=?(self.callbacks?or?[]?if?callbacks?is?None
          ??????????????????????????else?callbacks)
          ????????#?自定義的消息處理方法
          ????????self.on_message?=?on_message
          ????????self.tag_prefix?=?tag_prefix
          ????????self._active_tags?=?{}
          ????????...

          ????????if?self.channel:
          ????????????self.revive(self.channel)
          ????
          ????def?revive(self,?channel):
          ????????"""Revive?consumer?after?connection?loss."""
          ????????self._active_tags.clear()
          ????????channel?=?self.channel?=?maybe_channel(channel)
          ????????#?modify?dict?size?while?iterating?over?it?is?not?allowed
          ????????for?qname,?queue?in?list(self._queues.items()):
          ????????????#?name?may?have?changed?after?declare
          ????????????self._queues.pop(qname,?None)
          ????????????queue?=?self._queues[queue.name]?=?queue(self.channel)
          ????????????#?queue和channel綁定
          ????????????queue.revive(channel)
          ????????...
          ????
          ????def?__enter__(self):
          ????????self.consume()
          ????????return?self

          Consumer和Producer類似,設(shè)置完屬性后也要處理好channel,不同的是其中的queue(在producer中是exchange)和channel綁定并提供一個(gè)上下文環(huán)境。在上下文環(huán)境中進(jìn)行消息消費(fèi):

          def?consume(self,?no_ack=None):
          ????tag?=?self._add_tag(queue,?consumer_tag)
          ????#?每個(gè)queue消息消息
          ????for?queue?in?self._queues:
          ????????queue.consume(tag,?self._receive_callback,
          ??????????????????????????no_ack=no_ack,?nowait=nowait)

          def?_receive_callback(self,?message):
          ????accept?=?self.accept
          ????on_m,?channel,?decoded?=?self.on_message,?self.channel,?None
          ????try:
          ????????...
          ????????#?消息反序列化
          ????????decoded?=?None?if?on_m?else?message.decode()
          ????except?Exception?as?exc:
          ????????if?not?self.on_decode_error:
          ????????????raise
          ????????self.on_decode_error(message,?exc)
          ????else:
          ????????return?on_m(message)?if?on_m?else?self.receive(decoded,?message)

          def?receive(self,?body,?message):
          ????"""Method?called?when?a?message?is?received.

          ????This?dispatches?to?the?registered?:attr:`callbacks`.

          ????Arguments:
          ????????body?(Any):?The?decoded?message?body.
          ????????message?(~kombu.Message):?The?message?instance.

          ????Raises:
          ????????NotImplementedError:?If?no?consumer?callbacks?have?been
          ????????????registered.
          ????"
          ""
          ????#?執(zhí)行callback
          ????callbacks?=?self.callbacks
          ????...
          ????#?默認(rèn)就是body和message回傳給業(yè)務(wù)函數(shù)
          ????[callback(body,?message)?for?callback?in?callbacks]

          consumer可以使用多個(gè)queue,每個(gè)queue消費(fèi)消息的時(shí)候可以使用覆蓋處理函數(shù)或者使用系統(tǒng)的處理函數(shù)。一般情況下callback會(huì)獲得到解碼后的body和消息原文。如何持續(xù)的消費(fèi)消息,在connection部分再介紹。

          Exchange && Queue 解析

          producer需要使用exchange,consumer需要使用queue,消息是通過(guò)exchange和queue搭橋傳遞的。Exchange和Queue有共同的父類MaybeChannelBound:

          ??????????????+-------------------+
          ??????????????|?MaybeChannelBound?|
          ??????????????+-------^-----------+
          ??????????????????????|
          ?????+----------------+----------------+
          ?????|?????????????????????????????????|
          +----+-----+???????????????????????+---+---+
          |?Exchange?|???????????????????????|?Queue?|
          +----------+???????????????????????+-------+

          MaybeChannelBound約定了類對(duì)channel的綁定行為:

          class?MaybeChannelBound(Object):
          ????
          ????_channel?=?None
          ????_is_bound?=?False
          ????
          ????def?__call__(self,?channel):
          ????????"""`self(channel)?->?self.bind(channel)`."""
          ????????return?self.bind(channel)
          • _channel 和 _is_bound 都是類屬性,可以知道channel在類上重用
          • __call__魔法函數(shù)讓類方法, 比如exchange(channel)和queue(channel)執(zhí)行的時(shí)候會(huì)自動(dòng)執(zhí)行綁定到channel的動(dòng)作。

          下面綁定channel的動(dòng)作和是否綁定的判斷也可以驗(yàn)證這一點(diǎn)。

          def?maybe_bind(self,?channel):
          ????"""Bind?instance?to?channel?if?not?already?bound."""
          ????if?not?self.is_bound?and?channel:
          ????????self._channel?=?maybe_channel(channel)
          ????????self.when_bound()
          ????????self._is_bound?=?True
          ????return?self

          @property
          def?is_bound(self):
          ????"""Flag?set?if?the?channel?is?bound."""
          ????return?self._is_bound?and?self._channel?is?not?None

          exchange對(duì)象的創(chuàng)建和綁定到channel:

          class?Exchange(MaybeChannelBound):
          ????def?__init__(self,?name='',?type='',?channel=None,?**kwargs):
          ????????super().__init__(**kwargs)
          ????????self.name?=?name?or?self.name
          ????????self.type?=?type?or?self.type
          ????????self.maybe_bind(channel)
          ????????...

          創(chuàng)建完成的exchange對(duì)象需要進(jìn)行申明,申明的過(guò)程就是讓broker創(chuàng)建exchange的過(guò)程:

          def?declare(self,?nowait=False,?passive=None,?channel=None):
          ????"""Declare?the?exchange.

          ????Creates?the?exchange?on?the?broker,?unless?passive?is?set
          ????in?which?case?it?will?only?assert?that?the?exchange?exists.

          ????Argument:
          ????????nowait?(bool):?If?set?the?server?will?not?respond,?and?a
          ????????????response?will?not?be?waited?for.?Default?is?:const:`False`.
          ????"
          ""
          ????if?self._can_declare():
          ????????passive?=?self.passive?if?passive?is?None?else?passive
          ????????#?依托于channel
          ????????return?(channel?or?self.channel).exchange_declare(
          ????????????exchange=self.name,?type=self.type,?durable=self.durable,
          ????????????auto_delete=self.auto_delete,?arguments=self.arguments,
          ????????????nowait=nowait,?passive=passive,
          ????????)

          queue對(duì)象創(chuàng)建完成后也需要綁定到channel:

          class?Queue(MaybeChannelBound):
          ????def?__init__(self,?name='',?exchange=None,?routing_key='',
          ?????????????????channel=None,?bindings=None,?on_declared=None,
          ?????????????????**kwargs):
          ????????super().__init__(**kwargs)
          ????????self.name?=?name?or?self.name
          ????????self.maybe_bind(channel)
          ????????...

          然后申明queue,這個(gè)過(guò)程包括下面3個(gè)步驟:

          def?declare(self,?nowait=False,?channel=None):
          ????"""Declare?queue?and?exchange?then?binds?queue?to?exchange."""
          ????if?not?self.no_declare:
          ????????#?-?declare?main?binding.
          ????????self._create_exchange(nowait=nowait,?channel=channel)
          ????????self._create_queue(nowait=nowait,?channel=channel)
          ????????self._create_bindings(nowait=nowait,?channel=channel)
          ????return?self.name

          def?_create_exchange(self,?nowait=False,?channel=None):
          ????if?self.exchange:
          ????????#?隱式申明exchange
          ????????self.exchange.declare(nowait=nowait,?channel=channel)

          def?_create_queue(self,?nowait=False,?channel=None):
          ????#?申明queue
          ????self.queue_declare(nowait=nowait,?passive=False,?channel=channel)
          ????if?self.exchange?and?self.exchange.name:
          ????????#?綁定queue和exchange
          ????????self.queue_bind(nowait=nowait,?channel=channel)

          def?_create_bindings(self,?nowait=False,?channel=None):
          ????for?B?in?self.bindings:
          ????????channel?=?channel?or?self.channel
          ????????B.declare(channel)
          ????????B.bind(self,?nowait=nowait,?channel=channel)

          queue的申明也是讓broker創(chuàng)建queue:

          def?queue_declare(self,?nowait=False,?passive=False,?channel=None):
          ????...
          ????ret?=?channel.queue_declare(
          ????????????queue=self.name,
          ????????????passive=passive,
          ????????????durable=self.durable,
          ????????????exclusive=self.exclusive,
          ????????????auto_delete=self.auto_delete,
          ????????????arguments=queue_arguments,
          ????????????nowait=nowait,
          ????????)
          ????...

          queue比exchange多一個(gè)步驟就是bind到exchange。queue_bind的工作是讓broker創(chuàng)建queue和exchange的關(guān)聯(lián)關(guān)系。

          def?queue_bind(self,?nowait=False,?channel=None):
          ????"""Create?the?queue?binding?on?the?server."""
          ????return?(channel?or?self.channel).queue_bind(
          ????????queue=self.name,
          ????????exchange=exchange,
          ????????routing_key=routing_key,
          ????????arguments=arguments,
          ????????nowait=nowait,
          ????)

          從Exchange和Queue的實(shí)現(xiàn),我們可以知道生產(chǎn)者不用關(guān)心消費(fèi)者的實(shí)現(xiàn),只需要?jiǎng)?chuàng)建和申明exchange即可。消費(fèi)者則是需要知道生產(chǎn)者,除了創(chuàng)建和申明queue后,還需要綁定queue和exchange的關(guān)系。又因?yàn)橄M(fèi)者和生產(chǎn)者在不同的進(jìn)程,即使生成者創(chuàng)建了exchange,消費(fèi)者也需要在本地隱式創(chuàng)建exchange對(duì)象。

          Message 解析

          消息對(duì)象,除了純粹的數(shù)據(jù)結(jié)構(gòu)外,也包含channel的引用,畢竟消息可以直接執(zhí)行ack動(dòng)作:

          class?Message:
          ????def?__init__(self,?body=None,?delivery_tag=None,
          ?????????????????content_type=None,?content_encoding=None,?delivery_info=None,
          ?????????????????properties=None,?headers=None,?postencode=None,
          ?????????????????accept=None,?channel=None,?**kwargs):
          ????????#?通道,主要的API來(lái)源
          ????????self.channel?=?channel
          ????????#?投遞標(biāo)簽,可以用來(lái)響應(yīng)
          ????????self.delivery_tag?=?delivery_tag
          ????????...
          ????????self.headers?=?headers?or?{}
          ????????self.body?=?body
          ????????...
          ????????self._state?=?'RECEIVED'

          消息本身還帶有四個(gè)狀態(tài):

          • RECEIVED?默認(rèn)狀態(tài)
          • ACK?完成ack響應(yīng)
          • REJECTED?拒絕消息
          • REQUEUED?重新投遞消息

          其中?{'ACK', 'REJECTED', 'REQUEUED'}?三個(gè)狀態(tài)的轉(zhuǎn)換都需要使用channel進(jìn)行操作broker,成功后再切換:

          def?ack(self,?multiple=False):
          ????#?回應(yīng)ACK
          ????self.channel.basic_ack(self.delivery_tag,?multiple=multiple)
          ????self._state?=?'ACK'

          def?reject(self,?requeue=False):
          ????#?拒絕(拋棄消息)
          ????self.channel.basic_reject(self.delivery_tag,?requeue=requeue)
          ????self._state?=?'REJECTED'

          def?requeue(self):
          ????#?拒絕(退回消息)(和reject區(qū)別在requeue=True)
          ????self.channel.basic_reject(self.delivery_tag,?requeue=True)
          ????self._state?=?'REQUEUED'

          消息上附帶的信息,通過(guò)不同的load方法進(jìn)行序列化:

          from?.serialization?import?loads

          @property
          def?payload(self):
          ????return?loads(self.body,?self.content_type,
          ?????????????????????self.content_encoding,?accept=self.accept)????

          Connection 解析

          Connection負(fù)責(zé)管理producer/consumer到broker的網(wǎng)絡(luò)連接:

          class?Connection:
          ????def?__init__(self,?hostname='localhost',?userid=None,
          ?????????????????password=None,?virtual_host=None,?port=None,?insist=False,
          ?????????????????ssl=False,?transport=None,?connect_timeout=5,
          ?????????????????transport_options=None,?login_method=None,?uri_prefix=None,
          ?????????????????heartbeat=0,?failover_strategy='round-robin',
          ?????????????????alternates=None,?**kwargs):
          ????????...
          ????????params?=?self._initial_params?=?{
          ????????????'hostname':?hostname,?'userid':?userid,
          ????????????'password':?password,?'virtual_host':?virtual_host,
          ????????????'port':?port,?'insist':?insist,?'ssl':?ssl,
          ????????????'transport':?transport,?'connect_timeout':?connect_timeout,
          ????????????'login_method':?login_method,?'heartbeat':?heartbeat
          ????????}
          ????????...
          ????????
          ????????self._init_params(**params)
          ????????...

          重點(diǎn)在_init_params中對(duì)各種支持AQMP協(xié)議的broker的管理, 比如redis,RobbitMQ:

          def?_init_params(self,?hostname,?userid,?password,?virtual_host,?port,
          ?????????????????insist,?ssl,?transport,?connect_timeout,
          ?????????????????login_method,?heartbeat):
          ????transport?=?transport?or?'amqp'
          ????if?transport?==?'amqp'?and?supports_librabbitmq():
          ????????transport?=?'librabbitmq'
          ????if?transport?==?'rediss'?and?ssl_available?and?not?ssl:
          ????????logger.warning(
          ????????????'Secure?redis?scheme?specified?(rediss)?with?no?ssl?'
          ????????????'options,?defaulting?to?insecure?SSL?behaviour.'
          ????????)
          ????????ssl?=?{'ssl_cert_reqs':?CERT_NONE}
          ????self.hostname?=?hostname
          ????self.userid?=?userid
          ????self.password?=?password
          ????self.login_method?=?login_method
          ????#?虛擬主機(jī)隔離
          ????self.virtual_host?=?virtual_host?or?self.virtual_host
          ????self.port?=?port?or?self.port
          ????self.insist?=?insist
          ????self.connect_timeout?=?connect_timeout
          ????self.ssl?=?ssl
          ????#?傳輸類
          ????self.transport_cls?=?transport
          ????self.heartbeat?=?heartbeat?and?float(heartbeat)

          配置完connection信息后,就需要?jiǎng)?chuàng)建網(wǎng)絡(luò)連接。這個(gè)過(guò)程通過(guò)調(diào)用connection屬性或者default_channel屬性時(shí)候自動(dòng)創(chuàng)建:

          @property
          def?connection(self):
          ????"""The?underlying?connection?object.

          ????Warning:
          ????????This?instance?is?transport?specific,?so?do?not
          ????????depend?on?the?interface?of?this?object.
          ????"
          ""
          ????if?not?self._closed:
          ????????if?not?self.connected:
          ????????????#?創(chuàng)建連接
          ????????????return?self._ensure_connection(
          ????????????????max_retries=1,?reraise_as_library_errors=False
          ????????????)
          ????????return?self._connection
          ????????
          @property
          def?default_channel(self):
          ????"""Default?channel.

          ????Created?upon?access?and?closed?when?the?connection?is?closed.

          ????Note:
          ????????Can?be?used?for?automatic?channel?handling?when?you?only?need?one
          ????????channel,?and?also?it?is?the?channel?implicitly?used?if
          ????????a?connection?is?passed?instead?of?a?channel,?to?functions?that
          ????????require?a?channel.
          ????"
          ""
          ????#?make?sure?we're?still?connected,?and?if?not?refresh.
          ????conn_opts?=?self._extract_failover_opts()
          ????#?創(chuàng)建連接
          ????self._ensure_connection(**conn_opts)

          ????if?self._default_channel?is?None:
          ????????self._default_channel?=?self.channel()
          ????return?self._default_channel

          連接創(chuàng)建完成后,繼續(xù)創(chuàng)建channel:

          def?channel(self):
          ????"""Create?and?return?a?new?channel."""
          ????self._debug('create?channel')
          ????chan?=?self.transport.create_channel(self.connection)
          ????return?chan

          def?create_transport(self):
          ????#?創(chuàng)建傳輸連接
          ????return?self.get_transport_cls()(client=self)

          def?get_transport_cls(self):
          ????"""Get?the?currently?used?transport?class."""
          ????transport_cls?=?self.transport_cls
          ????if?not?transport_cls?or?isinstance(transport_cls,?str):
          ????????transport_cls?=?get_transport_cls(transport_cls)
          ????return?transport_cls

          創(chuàng)建broker的連接過(guò)程,是通過(guò)transport的創(chuàng)建,其中細(xì)節(jié)涉及對(duì)不同類型的broker服務(wù)的適配,內(nèi)容挺多,我們下一章再進(jìn)行解析。

          Matcher && serialization

          Matcher負(fù)責(zé)處理消息的匹配機(jī)制,serialization復(fù)雜消息的序列化。兩者的實(shí)現(xiàn)方式類似,都使用注冊(cè)中心模式+策略模式實(shí)現(xiàn)。

          Matcher的注冊(cè)中心:

          class?MatcherRegistry:
          ????"""Pattern?matching?function?registry."""
          ????"""匹配器的注冊(cè)中心"""

          ????MatcherNotInstalled?=?MatcherNotInstalled
          ????matcher_pattern_first?=?["pcre",?]

          ????def?__init__(self):
          ????????self._matchers?=?{}
          ????????self._default_matcher?=?None

          #:?Global?registry?of?matchers.
          registry?=?MatcherRegistry()

          注冊(cè)glob(模糊)模式和pcre(正則)模式兩種策略:

          def?register_glob():
          ????"""Register?glob?into?default?registry."""
          ????"""使用glob(通配符)匹配"""
          ????registry.register('glob',?fnmatch)


          def?register_pcre():
          ????"""Register?pcre?into?default?registry."""
          ????"""使用正則匹配"""
          ????registry.register('pcre',?rematch)


          #?Register?the?base?matching?methods.
          register_glob()
          register_pcre()

          匹配消息的方法,就是使用模式進(jìn)行識(shí)別:

          def?match(self,?data,?pattern,?matcher=None,?matcher_kwargs=None):
          ????"""Call?the?matcher."""
          ????if?matcher?and?not?self._matchers.get(matcher):
          ????????raise?self.MatcherNotInstalled(
          ????????????f'No?matcher?installed?for?{matcher}'
          ????????)
          ????#?默認(rèn)使用通配符匹配
          ????match_func?=?self._matchers[matcher?or?'glob']
          ????#?通配符和正則匹配的傳參先后順序有差異
          ????if?matcher?in?self.matcher_pattern_first:
          ????????first_arg?=?bytes_to_str(pattern)
          ????????second_arg?=?bytes_to_str(data)
          ????else:
          ????????first_arg?=?bytes_to_str(data)
          ????????second_arg?=?bytes_to_str(pattern)
          ????return?match_func(first_arg,?second_arg,?**matcher_kwargs?or?{})

          Serializer的注冊(cè)中心:

          class?SerializerRegistry:
          ????"""The?registry?keeps?track?of?serialization?methods."""
          ????"""序列化方法的注冊(cè)中心"""

          ????def?__init__(self):
          ????????self._encoders?=?{}
          ????????self._decoders?=?{}
          ????????self._default_encode?=?None
          ????????self._default_content_type?=?None
          ????????self._default_content_encoding?=?None
          ????????#?記錄禁用的編解碼類型
          ????????self._disabled_content_types?=?set()
          ????????#?雙向字典,可以進(jìn)行互查
          ????????self.type_to_name?=?{}
          ????????self.name_to_type?=?{}

          #?全局單例,并且導(dǎo)出函數(shù)綁定,使用API更簡(jiǎn)介
          registry?=?SerializerRegistry()
          dumps?=?registry.dumps
          loads?=?registry.loads
          register?=?registry.register
          unregister?=?registry.unregister

          json, yaml, pickle和msgpack四種序列化策略的注冊(cè):

          def?register_json():
          ????"""Register?a?encoder/decoder?for?JSON?serialization."""
          ????from?kombu.utils?import?json?as?_json

          ????registry.register('json',?_json.dumps,?_json.loads,
          ??????????????????????content_type='application/json',
          ??????????????????????content_encoding='utf-8')

          def?register_yaml():
          ????"""Register?a?encoder/decoder?for?YAML?serialization.

          ????It?is?slower?than?JSON,?but?allows?for?more?data?types
          ????to?be?serialized.?Useful?if?you?need?to?send?data?such?as?dates

          ????"
          ""
          ????import?yaml
          ????registry.register('yaml',?yaml.safe_dump,?yaml.safe_load,
          ??????????????????????content_type='application/x-yaml',
          ??????????????????????content_encoding='utf-8')

          def?register_pickle():
          ????"""Register?pickle?serializer.

          ????The?fastest?serialization?method,?but?restricts
          ????you?to?python?clients.
          ????"
          ""
          ????def?pickle_dumps(obj,?dumper=pickle.dumps):
          ????????return?dumper(obj,?protocol=pickle_protocol)

          ????registry.register('pickle',?pickle_dumps,?unpickle,
          ??????????????????????content_type='application/x-python-serialize',
          ??????????????????????content_encoding='binary')

          def?register_msgpack():
          ????"""Register?msgpack?serializer.

          ????See?Also:
          ????????https://msgpack.org/.
          ????"
          ""
          ????pack?=?unpack?=?None
          ????import?msgpack
          ????from?msgpack?import?packb,?unpackb

          ????def?pack(s):
          ????????return?packb(s,?use_bin_type=True)

          ????def?unpack(s):
          ????????return?unpackb(s,?raw=False)
          ????????
          ????registry.register(
          ????????'msgpack',?pack,?unpack,
          ????????content_type='application/x-msgpack',
          ????????content_encoding='binary',
          ????)

          register_json()
          register_pickle()
          register_yaml()
          register_msgpack()

          反序列化的使用:

          #?kombu-5.0.0/kombu/serialization.py:285
          #?導(dǎo)出策略
          loads?=?registry.loads

          #?kombu-5.0.0/kombu/message.py:10
          from?.serialization?import?loads

          class?Message:
          ????def?_decode(self):
          ????????#?使用策略反序列化message-body
          ????????return?loads(self.body,?self.content_type,
          ?????????????????????self.content_encoding,?accept=self.accept)

          小結(jié)

          通過(guò)kombu的Producer可以發(fā)送消息到broker,使用Comsumer則可以消費(fèi)消息。發(fā)送消息的時(shí)候需要使用Exchange,用來(lái)將消費(fèi)分發(fā)到不同的目標(biāo)Queue;消費(fèi)消息的時(shí)候,需要使用Queue,Queue還需要通過(guò)綁定的方式和Exchange關(guān)聯(lián)起來(lái)。Exchange和Queue都是使用底層的channel進(jìn)行數(shù)據(jù)傳輸,所以需要進(jìn)綁定(binding);還需要在遠(yuǎn)程的broker中創(chuàng)建,所以創(chuàng)建后的的Exchange和Queue需要進(jìn)行申明(declare)。消息會(huì)附帶上投遞信息,進(jìn)行序列化后從生產(chǎn)者到broker轉(zhuǎn)發(fā)給消費(fèi)者,消費(fèi)者再使用投遞信息上的序列化約定,將消息反序列成業(yè)務(wù)信息。

          小技巧

          pickle打包函數(shù)

          pickle不僅支持?jǐn)?shù)據(jù)接口的序列化,還支持函數(shù)的序列化:

          python3
          Python?3.8.5?(v3.8.5:580fbb018f,?Jul?20?2020,?12:11:27)
          [Clang?6.0?(clang-600.0.57)]?on?darwin
          Type?"help",?"copyright",?"credits"?or?"license"?for?more?information.
          >>>?import?pickle
          >>>
          >>>?def?hello(msg):
          ...?????print("hello",?msg)
          ...
          >>>?p?=?pickle.dumps(hello)
          >>>?p
          b'\x80\x04\x95\x16\x00\x00\x00\x00\x00\x00\x00\x8c\x08__main__\x94\x8c\x05hello\x94\x93\x94.'
          >>>
          >>>?q?=?pickle.loads(p)
          >>>
          >>>?q("python")
          hello?python
          >>>

          上面的hello函數(shù)可以通過(guò)pickle打包,再重新解包執(zhí)行。利用這個(gè)機(jī)制使用kombu,可以將producer進(jìn)程的函數(shù)發(fā)送到consumer進(jìn)程遠(yuǎn)程執(zhí)行。pickle支持的數(shù)據(jù)類型還挺豐富,官方文檔中介紹包括下面多種類型:

          The?following?types?can?be?pickled:

          *?None,?True,?and?False

          *?integers,?floating?point?numbers,?complex?numbers

          *?strings,?bytes,?bytearrays

          *?tuples,?lists,?sets,?and?dictionaries?containing?only?picklable?objects

          *?functions?defined?at?the?top?level?of?a?module?(using?def,?not?lambda)

          *?built-in?functions?defined?at?the?top?level?of?a?module

          *?classes?that?are?defined?at?the?top?level?of?a?module

          *?instances?of?such?classes?whose?__dict__?or?the?result?of?calling?__getstate__()?is?picklable?(see?section?Pickling?Class?Instances?for?details).

          配置類的簡(jiǎn)化

          Object提供了一種快速構(gòu)建對(duì)象的方法:

          class?Object:
          ????"""Common?base?class.

          ????Supports?automatic?kwargs->attributes?handling,?and?cloning.
          ????"
          ""

          ????attrs?=?()

          ????def?__init__(self,?*args,?**kwargs):
          ????????#?attrs?在子類中定義
          ????????for?name,?type_?in?self.attrs:
          ????????????value?=?kwargs.get(name)
          ????????????#?從字典參數(shù)給屬性動(dòng)態(tài)賦值
          ????????????if?value?is?not?None:
          ????????????????setattr(self,?name,?(type_?or?_any)(value))
          ????????????else:
          ????????????????try:
          ????????????????????getattr(self,?name)
          ????????????????except?AttributeError:
          ????????????????????setattr(self,?name,?None)

          Queue展示了這種方式的示例,比如max_length屬性:

          class?Queue(MaybeChannelBound):
          ????attrs?=?(
          ????????..
          ????????('max_length',?int),
          ????????...
          ????)
          ????def?__init__(self,?name='',?exchange=None,?routing_key='',
          ?????????????????channel=None,?bindings=None,?on_declared=None,
          ?????????????????**kwargs):
          ????????self.name?=?name?or?self.name
          ????????...
          ????
          ????def?queue_declare(self,?nowait=False,?passive=False,?channel=None):
          ????????...
          ????????queue_arguments?=?channel.prepare_queue_arguments(
          ????????????self.queue_arguments?or?{},
          ????????????expires=self.expires,
          ????????????message_ttl=self.message_ttl,
          ????????????max_length=self.max_length,
          ????????????max_length_bytes=self.max_length_bytes,
          ????????????max_priority=self.max_priority,
          ????????)
          ????????...

          在Queue的構(gòu)造函數(shù)中并沒有定義max_length屬性,但是queue_declare中卻可以直接使用這個(gè)屬性,可以對(duì)比name屬性感受一下差異。這對(duì)我們簡(jiǎn)化定義屬性很多的對(duì)象有幫助,比如一些配置類。

          使用count提供自增ID

          itertools.count提供了一種通過(guò)迭代器生成遞增ID的方法:

          >>>?from?itertools?import?count
          >>>
          >>>?for?i?in?count():
          ...?????if?i?%?10?==?0:
          ...?????????????print(i)
          ...?????if?i>50:
          ...?????????????break
          ...
          0
          10
          20
          30
          40
          50

          參考鏈接

          • https://github.com/celery/kombu
          • Talking to RabbitMQ with Python and Kombu https://medium.com/python-pandemonium/talking-to-rabbitmq-with-python-and-kombu-6cbee93b1298
          • 一篇文章講透徹了AMQP協(xié)議 https://jishuin.proginn.com/p/763bfbd2a068
          Python貓技術(shù)交流群開放啦!群里既有國(guó)內(nèi)一二線大廠在職員工,也有國(guó)內(nèi)外高校在讀學(xué)生,既有十多年碼齡的編程老鳥,也有中小學(xué)剛剛?cè)腴T的新人,學(xué)習(xí)氛圍良好!想入群的同學(xué),請(qǐng)?jiān)诠?hào)內(nèi)回復(fù)『交流群』,獲取貓哥的微信(謝絕廣告黨,非誠(chéng)勿擾!)~


          還不過(guò)癮?試試它們




          11 個(gè)最佳的 Python 編譯器和解釋器

          Python 常用庫(kù)之 psutil 使用指南

          Python進(jìn)階:自定義對(duì)象實(shí)現(xiàn)切片功能

          為什么 Python 3 ?把 print 改為函數(shù)?

          Python 的縮進(jìn)是不是反人類的設(shè)計(jì)?

          Python 處理日期與時(shí)間的全面總結(jié)(7000字)


          如果你覺得本文有幫助
          請(qǐng)慷慨分享點(diǎn)贊,感謝啦

          瀏覽 41
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  www.操操操操 | 特级日本欧美日韩亚洲精品综合免费在线 | 欧美性猛交XXXXX按摩欧美 | 伊人网在线成人在线视频 | 欧美A片视频 |