<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RabbitMQ 概念和應(yīng)用詳解

          共 13776字,需瀏覽 28分鐘

           ·

          2021-09-10 14:00

          RabbitMQ概述

          RabbitMQ可以做什么?

          RabbitMQ是實(shí)現(xiàn)AMQP(高級(jí)消息隊(duì)列協(xié)議)的消息中間件的一種,可用于在分布式系統(tǒng)中存儲(chǔ)轉(zhuǎn)發(fā)消息,主要有以下的技術(shù)亮點(diǎn):

          • 可靠性

          • 靈活的路由

          • 集群部署

          • 高可用的隊(duì)列消息

          • 可視化的管理工具

          RabbitMQ主要用于系統(tǒng)間的雙向解耦,當(dāng)生產(chǎn)者(productor)產(chǎn)生大量的數(shù)據(jù)時(shí),消費(fèi)者(consumer)無法快速的消費(fèi)信息,那么就需要一個(gè)類似于中間件的代理服務(wù)器,用來處理和保存這些數(shù)據(jù),RabbitMQ就扮演了這個(gè)角色。

          如何使用RabbitMQ

          • Erlang語言包

          • RabbitMQ安裝包

          基本概念

          1.Broker

          用來處理數(shù)據(jù)的消息隊(duì)列服務(wù)器實(shí)體

          2.虛擬主機(jī)(vhost)

          由RabbitMQ服務(wù)器創(chuàng)建的虛擬消息主機(jī),擁有自己的權(quán)限機(jī)制,一個(gè)broker里可以開設(shè)多個(gè)vhost,用于不同用戶的權(quán)限隔離,vhost之間是也完全隔離的。

          3.生產(chǎn)者(productor)

          產(chǎn)生用于消息通信的數(shù)據(jù)

          4.信道(channel)

          消息通道,在AMQP中可以建立多個(gè)channel,每個(gè)channel代表一個(gè)會(huì)話任務(wù)。

          5.交換機(jī)(exchange)

          (1)接受消息,轉(zhuǎn)發(fā)消息到綁定的隊(duì)列,總共有四種類型的交換器:direct,fanout,topic,headers。

          • direct:轉(zhuǎn)發(fā)消息到routing-key指定的隊(duì)列

          • fanout:轉(zhuǎn)發(fā)消息到所有綁定的隊(duì)列,類似于一種廣播發(fā)送的方式。

          • topic:按照規(guī)則轉(zhuǎn)發(fā)消息,這種規(guī)則多為模式匹配,也顯得更加靈活

          (2).交換器在RabbitMQ中是一個(gè)存在的實(shí)體,不能改變,如有需要只能刪除重建。

          (3).topic類型的交換器利用匹配規(guī)則分析消息的routing-key屬性。

          (4).屬性

          • 持久性:聲明時(shí)durable屬性為true

          • 自動(dòng)刪除:綁定的queue刪除也跟著刪除

          • 惰性:不會(huì)自動(dòng)創(chuàng)建

          6.隊(duì)列(queue)

          (1).隊(duì)列是RabbitMQ的內(nèi)部對(duì)象,存儲(chǔ)消息

          (2).可以動(dòng)態(tài)的增加消費(fèi)者,隊(duì)列將接受到的消息以輪詢(round-robin)的方式均勻的分配給多個(gè)消費(fèi)者

          (3).隊(duì)列的屬性

          • 持久性:如果啟用,隊(duì)列將會(huì)在server重啟之前有效

          • 自動(dòng)刪除:消費(fèi)者停止使用之后就會(huì)自動(dòng)刪除

          • 惰性:不會(huì)自動(dòng)創(chuàng)建

          • 排他性:如果啟用,隊(duì)列只能被聲明它的消費(fèi)者使用。

          7.兩個(gè)key

          • routing-key:消息不能直接發(fā)到queues,需要先發(fā)送到exchanges,routing-key指定queues名稱,exchanges通過routing-key來識(shí)別與之綁定的queues

          channel.queue_publish(exchange=exchange_name,
                               routing-key="rabbitmq",
                               body="openstack")
          • binding-key:主要是用來表示exchanges和queues之間的關(guān)系,為了區(qū)別queue_publish的routing-key,就稱作binding-key。

          channel.queue_bind(exchange=exchange_name,
                            queue=queue_name,
                            routing-key="rabbitmq")

          8.綁定(binding)

          表示交換機(jī)和隊(duì)列之間的關(guān)系,在進(jìn)行綁定時(shí),帶有一個(gè)額外的參數(shù)binding-key,來和routing-key相匹配。

          9.消費(fèi)者(consumer)

          監(jiān)聽消息隊(duì)列來進(jìn)行消息數(shù)據(jù)的讀取

          10.高可用性(HA)

          (1).在consumer處理完消息后,會(huì)發(fā)送消息ACK,通知通知RabbitMQ消息已被處理,可以從內(nèi)存刪除。如果消費(fèi)者因宕機(jī)或鏈接失敗等原因沒有發(fā)送ACK,則RabbitMQ會(huì)將消息重新發(fā)送給其他監(jiān)聽在隊(duì)列的下一個(gè)消費(fèi)者。

          channel.basicConsume(queuename, noAck=false, consumer);

          (2).消息和隊(duì)列的持久化

          (3).鏡像隊(duì)列,實(shí)現(xiàn)不同節(jié)點(diǎn)之間的元數(shù)據(jù)和消息同步

          RabbitMQ在OpenStack中的應(yīng)用

          RPC之neutron專題

          基于RabbitMQ的RPC消息通信是neutron中跨模塊進(jìn)行方法調(diào)用的很重要的一種方式,根據(jù)上面的描述,要組成一個(gè)完整的RPC通信結(jié)構(gòu),需要信息的生產(chǎn)者和消費(fèi)者。

          • client端:用于產(chǎn)生rpc消息。

          • server端:用于監(jiān)聽消息數(shù)據(jù)并進(jìn)行相應(yīng)的處理。

          1.neutron-agent中的RPC

          在dhcp_agent、l3_agent、metadata_agent,metering_agent的main函數(shù)中都存在一段創(chuàng)建一個(gè)rpc服務(wù)端的代碼,下面以dhcp_agent為例。

          def main():
              register_options(cfg.CONF)
              common_config.init(sys.argv[1:])
              config.setup_logging()
              server = neutron_service.Service.create(
                  binary='neutron-dhcp-agent',
                  topic=topics.DHCP_AGENT,
                  report_interval=cfg.CONF.AGENT.report_interval,
                  manager='neutron.agent.dhcp.agent.DhcpAgentWithStateReport')
              service.launch(cfg.CONF, server).wait()

          最核心的,也是跟rpc相關(guān)的部分包括兩部分,首先是創(chuàng)建rpc服務(wù)端。

          server = neutron_service.Service.create(
              binary='neutron-dhcp-agent',
              topic=topics.DHCP_AGENT,
              report_interval=cfg.CONF.AGENT.report_interval,
              manager='neutron.agent.dhcp.agent.DhcpAgentWithStateReport')

          該代碼實(shí)際上創(chuàng)建了一個(gè)rpc服務(wù)端,監(jiān)聽指定的topic并運(yùn)行manager上的tasks。

          create()方法返回一個(gè)neutron.service.Service對(duì)象,neutron.service.Service繼承自neutron.common.rpc.Service類。

          首先看neutron.common.rpc.Service類,該類定義了start方法,該方法主要完成兩件事情:一件事情是將manager添加到endpoints中;一件是創(chuàng)建rpc的consumer,分別監(jiān)聽topic的隊(duì)列消息。

          而在neutron.service.Service類中,初始化中生成了一個(gè)manager實(shí)例(即neutron.agent.dhcp_agent.DhcpAgentWithStateReport);并為start方法添加了周期性執(zhí)行report_state方法和periodic_tasks方法。report_state方法沒有具體實(shí)現(xiàn),periodic_tasks方法則調(diào)用manager的periodic_tasks方法。

          manager實(shí)例(即neutron.agent.dhcp_agent.DhcpAgentWithStateReport)在初始化的時(shí)候首先創(chuàng)建一個(gè)rpc的client端,通過代碼

           self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)

          該client端實(shí)際上定義了report_state方法,可以狀態(tài)以rpc消息的方式發(fā)送給plugin。

          manager在初始化后,還會(huì)指定周期性運(yùn)行_report_state方法,實(shí)際上就是調(diào)用client端的report_state方法。

          至此,對(duì)rpc服務(wù)端的創(chuàng)建算是完成了,之后執(zhí)行代碼。

          service.launch(server).wait()

          service.launch(server)方法首先會(huì)將server放到協(xié)程組中,并調(diào)用server的start方法來啟動(dòng)server。

          2.neutron-plugin中的RPC

          主要對(duì)ML2Plugin進(jìn)行分析,包括兩個(gè)類:RpcCallbacks和AgentNotifierApi。

          • RpcCallbacks:負(fù)責(zé)當(dāng)agent往plugin發(fā)出rpc請(qǐng)求時(shí)候,plugin實(shí)現(xiàn)請(qǐng)求的相關(guān)動(dòng)作,除了繼承自父類(dhcp rpc、dvr rpc、sg_db rpc和tunnel rpc)中的方法,還包括get_port_from_device、get_device_details、get_devices_details_list、update_device_down、update_device_up、get_dvr_mac_address_by_host、get_compute_ports_on_host_by_subnet、get_subnet_for_dvr等方法。

          • AgentNotifierApi:負(fù)責(zé)當(dāng)plugin往agent發(fā)出rpc請(qǐng)求(plugin通知agent)的時(shí)候,plugin端的方法。

          def start_rpc_listeners(self):
              """RpcCallbacks中實(shí)現(xiàn)的方法:Start the RPC loop to let the plugin communicate with agents."""
              self._setup_rpc()
              self.topic = topics.PLUGIN
              self.conn = n_rpc.create_connection(new=True)
              self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
              return self.conn.consume_in_threads()

          創(chuàng)建一個(gè)通知rpc的客戶端,用于向OVS的agent發(fā)出通知。所有plugin都需要有這樣一個(gè)發(fā)出通知消息的客戶端,創(chuàng)建了一個(gè)OVS agent的通知rpc客戶端。之后,創(chuàng)建兩個(gè)跟service agent相關(guān)的consumer,分別監(jiān)聽topics.PLUGIN

          ovs_neutron_agent也會(huì)創(chuàng)建RPC的consumer,用來監(jiān)聽topics.UPDATE、topics.DELETE等操作。

          def setup_rpc(self):
                  self.agent_id = 'ovs-agent-%s' % self.conf.host
                  self.topic = topics.AGENT
                  self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
                  self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
                  self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN)
                  self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
                  # RPC network init
                  self.context = context.get_admin_context_without_session()
                  # Handle updates from service
                  self.endpoints = [self]
                  # Define the listening consumers for the agent
                  consumers = [[topics.PORT, topics.UPDATE],
                               [topics.PORT, topics.DELETE],
                               [constants.TUNNEL, topics.UPDATE],
                               [constants.TUNNEL, topics.DELETE],
                               [topics.SECURITY_GROUP, topics.UPDATE],
                               [topics.DVR, topics.UPDATE],
                               [topics.NETWORK, topics.UPDATE]]

          3.neutron-server中的RPC

          這個(gè)rpc服務(wù)端主要通過neutron.server中主函數(shù)中代碼執(zhí)行

          neutron_rpc = service.serve_rpc()

          方法的實(shí)現(xiàn)代碼(目錄:neutron/neutron/service.py)如下

          def serve_rpc():
              plugin = manager.NeutronManager.get_plugin()
              service_plugins = (
                  manager.NeutronManager.get_service_plugins().values())
              if cfg.CONF.rpc_workers < 1:
                  cfg.CONF.set_override('rpc_workers'1)
              if not plugin.rpc_workers_supported():
                  LOG.debug("Active plugin doesn't implement start_rpc_listeners")
                  if 0 < cfg.CONF.rpc_workers:
                      LOG.error(_LE("'rpc_workers = %d' ignored because "
                                    "start_rpc_listeners is not implemented."),
                                cfg.CONF.rpc_workers)
                  raise NotImplementedError()
              try:
                  rpc = RpcWorker(service_plugins)
                  LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers)
                  session.dispose()
                  launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0)
                  launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
                  if (cfg.CONF.rpc_state_report_workers > 0 and
                      plugin.rpc_state_report_workers_supported()):
                      rpc_state_rep = RpcReportsWorker([plugin])
                      LOG.debug('using launcher for state reports rpc, workers=%s',
                                cfg.CONF.rpc_state_report_workers)
                      launcher.launch_service(
                          rpc_state_rep, workers=cfg.CONF.rpc_state_report_workers)
                  return launcher
              except Exception:
                  with excutils.save_and_reraise_exception():
                      LOG.exception(_LE('Unrecoverable error: please check log for '
                                        'details.'))

          其中,RpcWorker(plugin)主要通過調(diào)用plugin的方法來創(chuàng)建rpc服務(wù)端,最重要的工作是調(diào)用plugin的start_rpc_listeners來監(jiān)聽消息隊(duì)列:

          self._servers = self._plugin.start_rpc_listeners()

          該方法在大多數(shù)plugin中并未被實(shí)現(xiàn),目前ml2支持該方法。

          在neutron.plugin.ml2.plugin.ML2Plugin類中,該方法創(chuàng)建了一個(gè)topic為topics.PLUGIN的消費(fèi)rpc。

          def start_rpc_listeners(self):
                  self.endpoints = [rpc.RpcCallbacks(self.notifier, self.type_manager),
                                    agents_db.AgentExtRpcCallback()]
                  self.topic = topics.PLUGIN
                  self.conn = n_rpc.create_connection(new=True)
                  self.conn.create_consumer(self.topic, self.endpoints,
                                            fanout=False)
                  return self.conn.consume_in_threads()

          RPC之nova專題

          在Openstack中,每一個(gè)Nova服務(wù)初始化時(shí)會(huì)創(chuàng)建兩個(gè)隊(duì)列,一個(gè)名為“NODE-TYPE.NODE-ID”,另一個(gè)名為“NODE-TYPE”,NODE-TYPE是指服務(wù)的類型,NODE-ID指節(jié)點(diǎn)名稱。

          1.nova中實(shí)現(xiàn)exchange的種類

          • direct:初始化中,各個(gè)模塊對(duì)每一條系統(tǒng)消息自動(dòng)生成多個(gè)隊(duì)列放入RabbitMQ服務(wù)器中,隊(duì)列中綁定的binding-key要與routing-key匹配

          • topic:各個(gè)模塊也會(huì)自動(dòng)生成兩個(gè)隊(duì)列放入RabbitMQ服務(wù)器中。

          2.nova中調(diào)用RPC的方式

          • RPC.CALL:用于請(qǐng)求和響應(yīng)方式

          • RPC.CAST:只是提供單向請(qǐng)求

          3.nova中模塊的邏輯功能

          • Invoker:向消息隊(duì)列中發(fā)送系統(tǒng)請(qǐng)求信息,如Nova-API和Nova-Scheduler,通過RPC.CALL和RPC.CAST兩個(gè)進(jìn)程發(fā)送系統(tǒng)請(qǐng)求消息。

          • Worker:從消息隊(duì)列中獲取Invoker模塊發(fā)送的系統(tǒng)請(qǐng)求消息以及向Invoker模塊回復(fù)系統(tǒng)響應(yīng)消息,如Nova-Compute、Nova-Volume和Nova-Network,對(duì)RPC.CALL做出響應(yīng)。

          4.nova中的exchange domain

          • direct exchange domain: Topic消息生產(chǎn)者(Nova-API或者Nova-Scheduler)與Topic交換器生成邏輯連接,通過PRC.CALL或者RPC.CAST進(jìn)程將系統(tǒng)請(qǐng)求消息發(fā)往Topic交換器。交換器根據(jù)不同的routing-key將系統(tǒng)請(qǐng)求消息轉(zhuǎn)發(fā)到不同的類型的消息隊(duì)列。Topic消息消費(fèi)者探測(cè)到新消息已進(jìn)入響應(yīng)隊(duì)列,立即從隊(duì)列中接收消息并調(diào)用執(zhí)行系統(tǒng)消息所請(qǐng)求的應(yīng)用程序。

            • 點(diǎn)到點(diǎn)消息隊(duì)列:Topic消息消費(fèi)者應(yīng)用程序接收RPC.CALL的遠(yuǎn)程調(diào)用請(qǐng)求,并在執(zhí)行相關(guān)計(jì)算任務(wù)之后將結(jié)果以系統(tǒng)響應(yīng)消息的方式通過Direct交換器反饋給Direct消息消費(fèi)者。

            • 共享消息隊(duì)列:Topic消息消費(fèi)者應(yīng)用程序只是接收RPC.CAST的遠(yuǎn)程調(diào)用請(qǐng)求來執(zhí)行相關(guān)的計(jì)算任務(wù),并沒有響應(yīng)消息反饋。

          • topic exchange domain: Direct交換域并不是獨(dú)立運(yùn)作,而是受限于Topic交換域中RPC.CALL的遠(yuǎn)程調(diào)用流程與結(jié)果,每一個(gè)RPC.CALL激活一次Direct消息交換的運(yùn)作。

          以nova啟動(dòng)虛擬機(jī)的過程為例,詳細(xì)介紹RPC通信過程。

          RPC.CAST缺少了系統(tǒng)消息響應(yīng)流程。一個(gè)Topic消息生產(chǎn)者發(fā)送系統(tǒng)請(qǐng)求消息到Topic交換器,Topic交換器根據(jù)消息的Routing Key將消息轉(zhuǎn)發(fā)至共享消息隊(duì)列,與共享消息隊(duì)列相連的所有Topic消費(fèi)者接收該系統(tǒng)請(qǐng)求消息,并把它傳遞給響應(yīng)的Worker進(jìn)行處理,其調(diào)用流程如圖所示:


          source:  //chyufly.github.io/blog/2016/04/13/rabbitmq-introduction

          喜歡,在看



          瀏覽 43
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  一区二区三区人妖视频 | 手机免费观看AV | 亚洲AV男人天堂 | 国产又黄又硬又粗 | 一区二区无码 无修正 |