RabbitMQ 概念和應(yīng)用詳解
RabbitMQ概述
RabbitMQ可以做什么?
RabbitMQ是實(shí)現(xiàn)AMQP(高級(jí)消息隊(duì)列協(xié)議)的消息中間件的一種,可用于在分布式系統(tǒng)中存儲(chǔ)轉(zhuǎn)發(fā)消息,主要有以下的技術(shù)亮點(diǎn):
可靠性
靈活的路由
集群部署
高可用的隊(duì)列消息
可視化的管理工具
RabbitMQ主要用于系統(tǒng)間的雙向解耦,當(dāng)生產(chǎn)者(productor)產(chǎn)生大量的數(shù)據(jù)時(shí),消費(fèi)者(consumer)無法快速的消費(fèi)信息,那么就需要一個(gè)類似于中間件的代理服務(wù)器,用來處理和保存這些數(shù)據(jù),RabbitMQ就扮演了這個(gè)角色。
如何使用RabbitMQ
Erlang語言包
RabbitMQ安裝包
基本概念
1.Broker
用來處理數(shù)據(jù)的消息隊(duì)列服務(wù)器實(shí)體
2.虛擬主機(jī)(vhost)
由RabbitMQ服務(wù)器創(chuàng)建的虛擬消息主機(jī),擁有自己的權(quán)限機(jī)制,一個(gè)broker里可以開設(shè)多個(gè)vhost,用于不同用戶的權(quán)限隔離,vhost之間是也完全隔離的。
3.生產(chǎn)者(productor)
產(chǎn)生用于消息通信的數(shù)據(jù)
4.信道(channel)
消息通道,在AMQP中可以建立多個(gè)channel,每個(gè)channel代表一個(gè)會(huì)話任務(wù)。
5.交換機(jī)(exchange)
(1)接受消息,轉(zhuǎn)發(fā)消息到綁定的隊(duì)列,總共有四種類型的交換器:direct,fanout,topic,headers。
direct:轉(zhuǎn)發(fā)消息到routing-key指定的隊(duì)列

fanout:轉(zhuǎn)發(fā)消息到所有綁定的隊(duì)列,類似于一種廣播發(fā)送的方式。
topic:按照規(guī)則轉(zhuǎn)發(fā)消息,這種規(guī)則多為模式匹配,也顯得更加靈活

(2).交換器在RabbitMQ中是一個(gè)存在的實(shí)體,不能改變,如有需要只能刪除重建。
(3).topic類型的交換器利用匹配規(guī)則分析消息的routing-key屬性。
(4).屬性
持久性:聲明時(shí)durable屬性為true
自動(dòng)刪除:綁定的queue刪除也跟著刪除
惰性:不會(huì)自動(dòng)創(chuàng)建
6.隊(duì)列(queue)
(1).隊(duì)列是RabbitMQ的內(nèi)部對(duì)象,存儲(chǔ)消息
(2).可以動(dòng)態(tài)的增加消費(fèi)者,隊(duì)列將接受到的消息以輪詢(round-robin)的方式均勻的分配給多個(gè)消費(fèi)者
(3).隊(duì)列的屬性
持久性:如果啟用,隊(duì)列將會(huì)在server重啟之前有效
自動(dòng)刪除:消費(fèi)者停止使用之后就會(huì)自動(dòng)刪除
惰性:不會(huì)自動(dòng)創(chuàng)建
排他性:如果啟用,隊(duì)列只能被聲明它的消費(fèi)者使用。

7.兩個(gè)key
routing-key:消息不能直接發(fā)到queues,需要先發(fā)送到exchanges,routing-key指定queues名稱,exchanges通過routing-key來識(shí)別與之綁定的queues
channel.queue_publish(exchange=exchange_name,
routing-key="rabbitmq",
body="openstack")binding-key:主要是用來表示exchanges和queues之間的關(guān)系,為了區(qū)別queue_publish的routing-key,就稱作binding-key。
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing-key="rabbitmq")
8.綁定(binding)
表示交換機(jī)和隊(duì)列之間的關(guān)系,在進(jìn)行綁定時(shí),帶有一個(gè)額外的參數(shù)binding-key,來和routing-key相匹配。
9.消費(fèi)者(consumer)
監(jiān)聽消息隊(duì)列來進(jìn)行消息數(shù)據(jù)的讀取
10.高可用性(HA)
(1).在consumer處理完消息后,會(huì)發(fā)送消息ACK,通知通知RabbitMQ消息已被處理,可以從內(nèi)存刪除。如果消費(fèi)者因宕機(jī)或鏈接失敗等原因沒有發(fā)送ACK,則RabbitMQ會(huì)將消息重新發(fā)送給其他監(jiān)聽在隊(duì)列的下一個(gè)消費(fèi)者。
channel.basicConsume(queuename, noAck=false, consumer);(2).消息和隊(duì)列的持久化
(3).鏡像隊(duì)列,實(shí)現(xiàn)不同節(jié)點(diǎn)之間的元數(shù)據(jù)和消息同步
RabbitMQ在OpenStack中的應(yīng)用
RPC之neutron專題
基于RabbitMQ的RPC消息通信是neutron中跨模塊進(jìn)行方法調(diào)用的很重要的一種方式,根據(jù)上面的描述,要組成一個(gè)完整的RPC通信結(jié)構(gòu),需要信息的生產(chǎn)者和消費(fèi)者。
client端:用于產(chǎn)生rpc消息。
server端:用于監(jiān)聽消息數(shù)據(jù)并進(jìn)行相應(yīng)的處理。
1.neutron-agent中的RPC
在dhcp_agent、l3_agent、metadata_agent,metering_agent的main函數(shù)中都存在一段創(chuàng)建一個(gè)rpc服務(wù)端的代碼,下面以dhcp_agent為例。
def main():
register_options(cfg.CONF)
common_config.init(sys.argv[1:])
config.setup_logging()
server = neutron_service.Service.create(
binary='neutron-dhcp-agent',
topic=topics.DHCP_AGENT,
report_interval=cfg.CONF.AGENT.report_interval,
manager='neutron.agent.dhcp.agent.DhcpAgentWithStateReport')
service.launch(cfg.CONF, server).wait()
最核心的,也是跟rpc相關(guān)的部分包括兩部分,首先是創(chuàng)建rpc服務(wù)端。
server = neutron_service.Service.create(
binary='neutron-dhcp-agent',
topic=topics.DHCP_AGENT,
report_interval=cfg.CONF.AGENT.report_interval,
manager='neutron.agent.dhcp.agent.DhcpAgentWithStateReport')
該代碼實(shí)際上創(chuàng)建了一個(gè)rpc服務(wù)端,監(jiān)聽指定的topic并運(yùn)行manager上的tasks。
create()方法返回一個(gè)neutron.service.Service對(duì)象,neutron.service.Service繼承自neutron.common.rpc.Service類。
首先看neutron.common.rpc.Service類,該類定義了start方法,該方法主要完成兩件事情:一件事情是將manager添加到endpoints中;一件是創(chuàng)建rpc的consumer,分別監(jiān)聽topic的隊(duì)列消息。
而在neutron.service.Service類中,初始化中生成了一個(gè)manager實(shí)例(即neutron.agent.dhcp_agent.DhcpAgentWithStateReport);并為start方法添加了周期性執(zhí)行report_state方法和periodic_tasks方法。report_state方法沒有具體實(shí)現(xiàn),periodic_tasks方法則調(diào)用manager的periodic_tasks方法。
manager實(shí)例(即neutron.agent.dhcp_agent.DhcpAgentWithStateReport)在初始化的時(shí)候首先創(chuàng)建一個(gè)rpc的client端,通過代碼
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)該client端實(shí)際上定義了report_state方法,可以狀態(tài)以rpc消息的方式發(fā)送給plugin。
manager在初始化后,還會(huì)指定周期性運(yùn)行_report_state方法,實(shí)際上就是調(diào)用client端的report_state方法。
至此,對(duì)rpc服務(wù)端的創(chuàng)建算是完成了,之后執(zhí)行代碼。
service.launch(server).wait()
service.launch(server)方法首先會(huì)將server放到協(xié)程組中,并調(diào)用server的start方法來啟動(dòng)server。

2.neutron-plugin中的RPC
主要對(duì)ML2Plugin進(jìn)行分析,包括兩個(gè)類:RpcCallbacks和AgentNotifierApi。
RpcCallbacks:負(fù)責(zé)當(dāng)agent往plugin發(fā)出rpc請(qǐng)求時(shí)候,plugin實(shí)現(xiàn)請(qǐng)求的相關(guān)動(dòng)作,除了繼承自父類(dhcp rpc、dvr rpc、sg_db rpc和tunnel rpc)中的方法,還包括get_port_from_device、get_device_details、get_devices_details_list、update_device_down、update_device_up、get_dvr_mac_address_by_host、get_compute_ports_on_host_by_subnet、get_subnet_for_dvr等方法。
AgentNotifierApi:負(fù)責(zé)當(dāng)plugin往agent發(fā)出rpc請(qǐng)求(plugin通知agent)的時(shí)候,plugin端的方法。
def start_rpc_listeners(self):
"""RpcCallbacks中實(shí)現(xiàn)的方法:Start the RPC loop to let the plugin communicate with agents."""
self._setup_rpc()
self.topic = topics.PLUGIN
self.conn = n_rpc.create_connection(new=True)
self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
return self.conn.consume_in_threads()
創(chuàng)建一個(gè)通知rpc的客戶端,用于向OVS的agent發(fā)出通知。所有plugin都需要有這樣一個(gè)發(fā)出通知消息的客戶端,創(chuàng)建了一個(gè)OVS agent的通知rpc客戶端。之后,創(chuàng)建兩個(gè)跟service agent相關(guān)的consumer,分別監(jiān)聽topics.PLUGIN

ovs_neutron_agent也會(huì)創(chuàng)建RPC的consumer,用來監(jiān)聽topics.UPDATE、topics.DELETE等操作。
def setup_rpc(self):
self.agent_id = 'ovs-agent-%s' % self.conf.host
self.topic = topics.AGENT
self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
self.endpoints = [self]
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.PORT, topics.DELETE],
[constants.TUNNEL, topics.UPDATE],
[constants.TUNNEL, topics.DELETE],
[topics.SECURITY_GROUP, topics.UPDATE],
[topics.DVR, topics.UPDATE],
[topics.NETWORK, topics.UPDATE]]
3.neutron-server中的RPC
這個(gè)rpc服務(wù)端主要通過neutron.server中主函數(shù)中代碼執(zhí)行
neutron_rpc = service.serve_rpc()
方法的實(shí)現(xiàn)代碼(目錄:neutron/neutron/service.py)如下
def serve_rpc():
plugin = manager.NeutronManager.get_plugin()
service_plugins = (
manager.NeutronManager.get_service_plugins().values())
if cfg.CONF.rpc_workers < 1:
cfg.CONF.set_override('rpc_workers', 1)
if not plugin.rpc_workers_supported():
LOG.debug("Active plugin doesn't implement start_rpc_listeners")
if 0 < cfg.CONF.rpc_workers:
LOG.error(_LE("'rpc_workers = %d' ignored because "
"start_rpc_listeners is not implemented."),
cfg.CONF.rpc_workers)
raise NotImplementedError()
try:
rpc = RpcWorker(service_plugins)
LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers)
session.dispose()
launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0)
launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
if (cfg.CONF.rpc_state_report_workers > 0 and
plugin.rpc_state_report_workers_supported()):
rpc_state_rep = RpcReportsWorker([plugin])
LOG.debug('using launcher for state reports rpc, workers=%s',
cfg.CONF.rpc_state_report_workers)
launcher.launch_service(
rpc_state_rep, workers=cfg.CONF.rpc_state_report_workers)
return launcher
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE('Unrecoverable error: please check log for '
'details.'))
其中,RpcWorker(plugin)主要通過調(diào)用plugin的方法來創(chuàng)建rpc服務(wù)端,最重要的工作是調(diào)用plugin的start_rpc_listeners來監(jiān)聽消息隊(duì)列:
self._servers = self._plugin.start_rpc_listeners()
該方法在大多數(shù)plugin中并未被實(shí)現(xiàn),目前ml2支持該方法。
在neutron.plugin.ml2.plugin.ML2Plugin類中,該方法創(chuàng)建了一個(gè)topic為topics.PLUGIN的消費(fèi)rpc。
def start_rpc_listeners(self):
self.endpoints = [rpc.RpcCallbacks(self.notifier, self.type_manager),
agents_db.AgentExtRpcCallback()]
self.topic = topics.PLUGIN
self.conn = n_rpc.create_connection(new=True)
self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)
return self.conn.consume_in_threads()
RPC之nova專題
在Openstack中,每一個(gè)Nova服務(wù)初始化時(shí)會(huì)創(chuàng)建兩個(gè)隊(duì)列,一個(gè)名為“NODE-TYPE.NODE-ID”,另一個(gè)名為“NODE-TYPE”,NODE-TYPE是指服務(wù)的類型,NODE-ID指節(jié)點(diǎn)名稱。

1.nova中實(shí)現(xiàn)exchange的種類
direct:初始化中,各個(gè)模塊對(duì)每一條系統(tǒng)消息自動(dòng)生成多個(gè)隊(duì)列放入RabbitMQ服務(wù)器中,隊(duì)列中綁定的binding-key要與routing-key匹配
topic:各個(gè)模塊也會(huì)自動(dòng)生成兩個(gè)隊(duì)列放入RabbitMQ服務(wù)器中。
2.nova中調(diào)用RPC的方式
RPC.CALL:用于請(qǐng)求和響應(yīng)方式
RPC.CAST:只是提供單向請(qǐng)求
3.nova中模塊的邏輯功能
Invoker:向消息隊(duì)列中發(fā)送系統(tǒng)請(qǐng)求信息,如Nova-API和Nova-Scheduler,通過RPC.CALL和RPC.CAST兩個(gè)進(jìn)程發(fā)送系統(tǒng)請(qǐng)求消息。
Worker:從消息隊(duì)列中獲取Invoker模塊發(fā)送的系統(tǒng)請(qǐng)求消息以及向Invoker模塊回復(fù)系統(tǒng)響應(yīng)消息,如Nova-Compute、Nova-Volume和Nova-Network,對(duì)RPC.CALL做出響應(yīng)。
4.nova中的exchange domain
direct exchange domain: Topic消息生產(chǎn)者(Nova-API或者Nova-Scheduler)與Topic交換器生成邏輯連接,通過PRC.CALL或者RPC.CAST進(jìn)程將系統(tǒng)請(qǐng)求消息發(fā)往Topic交換器。交換器根據(jù)不同的routing-key將系統(tǒng)請(qǐng)求消息轉(zhuǎn)發(fā)到不同的類型的消息隊(duì)列。Topic消息消費(fèi)者探測(cè)到新消息已進(jìn)入響應(yīng)隊(duì)列,立即從隊(duì)列中接收消息并調(diào)用執(zhí)行系統(tǒng)消息所請(qǐng)求的應(yīng)用程序。
點(diǎn)到點(diǎn)消息隊(duì)列:Topic消息消費(fèi)者應(yīng)用程序接收RPC.CALL的遠(yuǎn)程調(diào)用請(qǐng)求,并在執(zhí)行相關(guān)計(jì)算任務(wù)之后將結(jié)果以系統(tǒng)響應(yīng)消息的方式通過Direct交換器反饋給Direct消息消費(fèi)者。
共享消息隊(duì)列:Topic消息消費(fèi)者應(yīng)用程序只是接收RPC.CAST的遠(yuǎn)程調(diào)用請(qǐng)求來執(zhí)行相關(guān)的計(jì)算任務(wù),并沒有響應(yīng)消息反饋。
topic exchange domain: Direct交換域并不是獨(dú)立運(yùn)作,而是受限于Topic交換域中RPC.CALL的遠(yuǎn)程調(diào)用流程與結(jié)果,每一個(gè)RPC.CALL激活一次Direct消息交換的運(yùn)作。

以nova啟動(dòng)虛擬機(jī)的過程為例,詳細(xì)介紹RPC通信過程。

RPC.CAST缺少了系統(tǒng)消息響應(yīng)流程。一個(gè)Topic消息生產(chǎn)者發(fā)送系統(tǒng)請(qǐng)求消息到Topic交換器,Topic交換器根據(jù)消息的Routing Key將消息轉(zhuǎn)發(fā)至共享消息隊(duì)列,與共享消息隊(duì)列相連的所有Topic消費(fèi)者接收該系統(tǒng)請(qǐng)求消息,并把它傳遞給響應(yīng)的Worker進(jìn)行處理,其調(diào)用流程如圖所示:

source: //chyufly.github.io/blog/2016/04/13/rabbitmq-introduction

喜歡,在看
