10 分鐘入門 Python Rpc 實現(xiàn)
XML-RPC 是一種遠(yuǎn)程過程調(diào)用方法,它使用通過 HTTP 傳遞的 XML 作為載體。有了它,客戶端可以在遠(yuǎn)程服務(wù)器上調(diào)用帶參數(shù)的服務(wù)器方法(服務(wù)器以 URI 命名)并獲取結(jié)構(gòu)化的數(shù)據(jù)。python自帶xmlrpc實現(xiàn),學(xué)習(xí)xmlrpc,可以讓我們快速了解rpc的實現(xiàn)及原理,本文包括下面幾個部分:
xmlrpc演示 xmlrpc-API xmlrpc-server實現(xiàn) xmlrpc-client實現(xiàn) xmlrpc序列化/反序列化 小結(jié)
xmlrpc演示
xmlrpc可以直接運(yùn)行,啟動rpc服務(wù):
# python3 -m xmlrpc.server
Serving XML-RPC on localhost port 8000
It is advisable to run this example server within a secure, closed network.
127.0.0.1 - - [05/May/2021 18:03:16] "POST /RPC2 HTTP/1.1" 200 -
127.0.0.1 - - [05/May/2021 18:03:16] "POST /RPC2 HTTP/1.1" 200 -
啟動rpc客戶端:
# python3 -m xmlrpc.client
20210505T18:03:16
42
512
3
從服務(wù)端可以看到2個http請求。我們先看抓包得到的第一個http請求報文:
METHOD: POST
URL: http://localhost:8000/RPC2
HEADERS
accept-encoding: gzip
content-length: 120
content-type: text/xml
host: localhost:8000
user-agent: Python-xmlrpc/3.8
請求的數(shù)據(jù)是:
<?xml version='1.0'?>
<methodCall>
<methodName>
currentTime.getCurrentTime
</methodName>
<params>
</params>
</methodCall>
http響應(yīng)報文:
STATUS: 200 OK
HEADERS
content-length: 163
content-type: text/xml
date: Wed, 05 May 2021 09:40:57 GMT
server: BaseHTTP/0.6 Python/3.8.5
響應(yīng)的數(shù)據(jù)是:
<?xml version='1.0'?>
<methodResponse>
<params>
<param>
<value>
<dateTime.iso8601>
20210505T18:03:16
</dateTime.iso8601>
</value>
</param>
</params>
</methodResponse>
第二個請求的數(shù)據(jù)報文我就不貼出來了,下面是請求xml:
<?xml version='1.0'?>
<methodCall>
<methodName>
system.multicall
</methodName>
<params>
<param>
<value>
<array>
<data>
<value>
<struct>
<member>
<name>
methodName
</name>
<value>
<string>
getData
</string>
</value>
</member>
<member>
<name>
params
</name>
<value>
<array>
<data>
</data>
</array>
</value>
</member>
</struct>
</value>
<value>
<struct>
<member>
<name>
methodName
</name>
<value>
<string>
pow
</string>
</value>
</member>
<member>
<name>
params
</name>
<value>
<array>
<data>
<value>
<int>
2
</int>
</value>
<value>
<int>
9
</int>
</value>
</data>
</array>
</value>
</member>
</struct>
</value>
<value>
<struct>
<member>
<name>
methodName
</name>
<value>
<string>
add
</string>
</value>
</member>
<member>
<name>
params
</name>
<value>
<array>
<data>
<value>
<int>
1
</int>
</value>
<value>
<int>
2
</int>
</value>
</data>
</array>
</value>
</member>
</struct>
</value>
</data>
</array>
</value>
</param>
</params>
</methodCall>
下面是響應(yīng)xml:
<?xml version='1.0'?>
<methodResponse>
<params>
<param>
<value>
<array>
<data>
<value>
<array>
<data>
<value>
<string>
42
</string>
</value>
</data>
</array>
</value>
<value>
<array>
<data>
<value>
<int>
512
</int>
</value>
</data>
</array>
</value>
<value>
<array>
<data>
<value>
<int>
3
</int>
</value>
</data>
</array>
</value>
</data>
</array>
</value>
</param>
</params>
</methodResponse>
注: 為了完整展示xmlrpc數(shù)據(jù),所以我貼了xml全文,導(dǎo)致內(nèi)容有點(diǎn)長。
從演示可以看到xmlrpc下面幾個特點(diǎn):
使用http協(xié)議進(jìn)行數(shù)據(jù)傳輸。使用 POST方法,url是RPC2,content-type是text/xml。使用xml對請求/響應(yīng)進(jìn)行編碼。請求使用 methodCall標(biāo)簽, 響應(yīng)使用methodResponse標(biāo)簽。使用xml數(shù)據(jù)層層嵌套,冗余較多,看起來非常煩瑣 (這應(yīng)該是xmlrpc沒流行起來的原因之一)。
xmlrpc-API
繼續(xù)查看xmlrpc的API使用,服務(wù)端代碼:
class ExampleService:
def getData(self):
return '42'
class currentTime:
@staticmethod
def getCurrentTime():
return datetime.datetime.now()
with SimpleXMLRPCServer(("localhost", 8000)) as server:
server.register_function(pow)
server.register_function(lambda x,y: x+y, 'add')
server.register_instance(ExampleService(), allow_dotted_names=True)
server.register_multicall_functions()
print('Serving XML-RPC on localhost port 8000')
print('It is advisable to run this example server within a secure, closed network.')
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nKeyboard interrupt received, exiting.")
sys.exit(0)
服務(wù)端做了下面幾件事:
在8000端口創(chuàng)建SimpleXMLRPCServer的實例server 向server注冊 pow和名為add的lambda函數(shù)接口向server注冊服務(wù)實例instance(可能叫app更合適),instance帶有2個函數(shù)實現(xiàn): getData和currentTime.getCurrentTime向server注冊system.multicall實現(xiàn),這個實現(xiàn)支持多個rpc請求合并使用一個http請求 啟動server
客戶端是這樣使用的:
server = ServerProxy("http://localhost:8000")
print(server.currentTime.getCurrentTime())
multi = MultiCall(server)
multi.getData()
multi.pow(2,9)
multi.add(1,2)
try:
for response in multi():
print(response)
except Error as v:
print("ERROR", v)
創(chuàng)建了一個服務(wù)代理(可以理解為rpc-client) 調(diào)用服務(wù)端的實現(xiàn) currentTime.getCurrentTime使用MultiCall的方式調(diào)用 getData,pow和add三個rpc接口發(fā)送multicall請求,并循環(huán)打印服務(wù)調(diào)用結(jié)果
可以很明顯的對比出xmlrpc服務(wù)和http服務(wù)的不同:
rpc服務(wù)接口都是普通的函數(shù),比如pow,getData和getCurrentTime;這些接口和http的request和response是隔離的 客戶端需要額外實現(xiàn),并不是直接發(fā)送的http請求
同時大家對 RPC(remote procedure call) 應(yīng)該也有直觀了解,簡單的解釋就是遠(yuǎn)程函數(shù)調(diào)用。所謂遠(yuǎn)程:跨機(jī)器是遠(yuǎn)程,我們這里的跨進(jìn)程也是遠(yuǎn)程。至于如何實現(xiàn)遠(yuǎn)程函數(shù)調(diào)用,就是各個RPC框架的功能了,今天我們先看看xmlrpc的實現(xiàn)。
xmlrpc-server的實現(xiàn)
服務(wù)端http協(xié)議實現(xiàn)
xmlrpc中http協(xié)議由SimpleXMLRPCServer和SimpleXMLRPCRequestHandler實現(xiàn):
class SimpleXMLRPCServer(socketserver.TCPServer,
SimpleXMLRPCDispatcher):
allow_reuse_address = True
_send_traceback_header = False
def __init__(self, addr, requestHandler=SimpleXMLRPCRequestHandler,
logRequests=True, allow_none=False, encoding=None,
bind_and_activate=True, use_builtin_types=False):
self.logRequests = logRequests
SimpleXMLRPCDispatcher.__init__(self, allow_none, encoding, use_builtin_types)
socketserver.TCPServer.__init__(self, addr, requestHandler, bind_and_activate)
SimpleXMLRPCServer的父類TCPServer在之前的博文中有介紹,提供tcp服務(wù)的實現(xiàn)。SimpleXMLRPCRequestHandler負(fù)責(zé)http協(xié)議部分的實現(xiàn),而xmlrpc規(guī)范是必須使用POST請求到 /RPC2 重點(diǎn)就在 do_POST 方法:
class SimpleXMLRPCRequestHandler(BaseHTTPRequestHandler):
# rpc-url
rpc_paths = ('/', '/RPC2')
def do_POST(self):
...
max_chunk_size = 10*1024*1024
size_remaining = int(self.headers["content-length"])
L = []
while size_remaining:
chunk_size = min(size_remaining, max_chunk_size)
chunk = self.rfile.read(chunk_size)
if not chunk:
break
L.append(chunk)
size_remaining -= len(L[-1])
data = b''.join(L)
...
response = self.server._marshaled_dispatch(
data, getattr(self, '_dispatch', None), self.path
)
...
self.send_response(200)
self.send_header("Content-type", "text/xml")
self.send_header("Content-length", str(len(response)))
self.end_headers()
self.wfile.write(response)
do_POST方法分三段:
從http請求上讀取請求數(shù)據(jù),數(shù)據(jù)長度由 content-length 決定 使用server的_marshaled_dispatch方法調(diào)用rpc接口 將接口返回值包裝成http響應(yīng)返回
服務(wù)端rpc協(xié)議實現(xiàn)
SimpleXMLRPCServer的另外一個父類SimpleXMLRPCDispatcher提供了rpc協(xié)議的實現(xiàn):
class SimpleXMLRPCDispatcher:
def __init__(self, allow_none=False, encoding=None,
use_builtin_types=False):
# 接口函數(shù)字典
self.funcs = {}
# 服務(wù)實例 (app)
self.instance = None
self.allow_none = allow_none
self.encoding = encoding or 'utf-8'
self.use_builtin_types = use_builtin_types
def register_instance(self, instance, allow_dotted_names=False):
# 注冊服務(wù)實例對象
self.instance = instance
self.allow_dotted_names = allow_dotted_names
def register_function(self, function=None, name=None):
# 注冊接口方法
if name is None:
name = function.__name__
self.funcs[name] = function
return function
def register_multicall_functions(self):
"""Registers the XML-RPC multicall method in the system
namespace."""
# 復(fù)合調(diào)用
self.funcs.update({'system.multicall' : self.system_multicall})
instace和function的注冊比較簡單,我們可以跳過實現(xiàn)會略微復(fù)雜一點(diǎn)的multical,先看看注冊的接口如何在_marshaled_dispatch中調(diào)用:
def _marshaled_dispatch(self, data, dispatch_method = None, path = None):
try:
# 解析rpc接口和參數(shù)
params, method = loads(data, use_builtin_types=self.use_builtin_types)
# generate response
response = self._dispatch(method, params)
# wrap response in a singleton tuple
response = (response,)
# 生成xml響應(yīng)
response = dumps(response, methodresponse=1,
allow_none=self.allow_none, encoding=self.encoding)
except Fault as fault:
...
except:
...
return response.encode(self.encoding, 'xmlcharrefreplace')
def _dispatch(self, method, params):
try:
# call the matching registered function
# 查找接口
func = self.funcs[method]
except KeyError:
pass
else:
if func is not None:
# 執(zhí)行接口
return func(*params)
...
if self.instance is not None:
if hasattr(self.instance, '_dispatch'):
# call the `_dispatch` method on the instance
return self.instance._dispatch(method, params)
# call the instance's method directly
try:
func = resolve_dotted_attribute(
self.instance,
method,
self.allow_dotted_names
)
except AttributeError:
pass
else:
if func is not None:
return func(*params)
...
代碼比較長,主要做了2件事:
從請求中解析 params 和 method 根據(jù)method從func或者instance中調(diào)用方法并返回
服務(wù)端multi-call實現(xiàn)
了解 single-call 后,再回頭看 multi-call 的實現(xiàn),就比較容易。注冊接口:
def register_multicall_functions(self):
self.funcs.update({'system.multicall' : self.system_multicall})
funcs字典中會增加一個名為 system.multicall ,處理函數(shù)為system_multicall的調(diào)用:
def system_multicall(self, call_list):
"""system.multicall([{'methodName': 'add', 'params': [2, 2]}, ...]) => \
[[4], ...]
Allows the caller to package multiple XML-RPC calls into a single
request.
See http://www.xmlrpc.com/discuss/msgReader$1208
"""
results = []
# 順序執(zhí)行多個call
for call in call_list:
method_name = call['methodName']
params = call['params']
...
results.append([self._dispatch(method_name, params)])
...
return results
system_multicall和注釋介紹一樣,就是從請求中接受多個請求,然后逐一調(diào)用執(zhí)行。system.multicall的調(diào)用數(shù)據(jù)示例:
<methodName>
system.multicall
</methodName>
<params>
...
<member>
<name>
methodName
</name>
<value>
<string>
getData
</string>
</value>
</member>
...
<params>
xmlrpc-client實現(xiàn)
客戶端http協(xié)議實現(xiàn)
客戶端也需要實現(xiàn)http協(xié)議,主要在ServerProxy和Transport中(SafeTransport實現(xiàn)https)。ServerProxy包裝Transport對象:
class ServerProxy:
def __init__(self, uri, transport=None, encoding=None, verbose=False,
allow_none=False, use_datetime=False, use_builtin_types=False,
*, headers=(), context=None):
# get the url
type, uri = urllib.parse._splittype(uri)
self.__host, self.__handler = urllib.parse._splithost(uri)
..
handler = Transport
extra_kwargs = {}
transport = handler(use_datetime=use_datetime,
use_builtin_types=use_builtin_types,
headers=headers,
**extra_kwargs)
self.__transport = transport
...
def __request(self, methodname, params):
# call a method on the remote server
# 接口調(diào)用轉(zhuǎn)為xml請求數(shù)據(jù)
request = dumps(params, methodname, encoding=self.__encoding,
allow_none=self.__allow_none).encode(self.__encoding, 'xmlcharrefreplace')
response = self.__transport.request(
self.__host,
self.__handler,
request,
verbose=self.__verbose
)
return response
Transport實現(xiàn)http細(xì)節(jié):
class Transport:
"""Handles an HTTP transaction to an XML-RPC server."""
def __init__(self, use_datetime=False, use_builtin_types=False,
*, headers=()):
self._use_datetime = use_datetime
self._use_builtin_types = use_builtin_types
self._connection = (None, None)
self._headers = list(headers)
self._extra_headers = []
def request(self, host, handler, request_body, verbose=False):
http_conn = self.send_request(host, handler, request_body, verbose)
resp = http_conn.getresponse()
if resp.status == 200:
self.verbose = verbose
return self.parse_response(resp)
def send_request(self, host, handler, request_body, debug):
connection = self.make_connection(host)
headers = self._headers + self._extra_headers
...
connection.putrequest("POST", handler)
headers.append(("Content-Type", "text/xml"))
headers.append(("User-Agent", self.user_agent))
self.send_headers(connection, headers)
self.send_content(connection, request_body)
return connection
def make_connection(self, host):
if self._connection and host == self._connection[0]:
return self._connection[1]
# create a HTTP connection object from a host descriptor
chost, self._extra_headers, x509 = self.get_host_info(host)
self._connection = host, http.client.HTTPConnection(chost)
return self._connection[1]
def parse_response(self, response):
stream = response
p, u = self.getparser()
while 1:
data = stream.read(1024)
if not data:
break
if self.verbose:
print("body:", repr(data))
p.feed(data)
if stream is not response:
stream.close()
p.close()
return u.close()
使用http.client創(chuàng)建http連接 使用send_request發(fā)送http請求 使用parse_response解析http請求
客戶端rpc協(xié)議實現(xiàn)
在http協(xié)議上使用_Method包裝請求:
class ServerProxy:
def __getattr__(self, name):
# magic method dispatcher
return _Method(self.__request, name)
class _Method:
# some magic to bind an XML-RPC method to an RPC server.
# supports "nested" methods (e.g. examples.getStateName)
def __init__(self, send, name):
self.__send = send
self.__name = name
def __getattr__(self, name):
return _Method(self.__send, "%s.%s" % (self.__name, name))
def __call__(self, *args):
return self.__send(self.__name, args)
可以使用 server.currentTime.getCurrentTime() 發(fā)送請求,這是一個鏈?zhǔn)秸{(diào)用。server.currentTime會調(diào)用 ServerProxy.__getattr__ 得到一個_Method對象;繼續(xù)調(diào)用getCurrentTime會執(zhí)行 _Method.__getattr__ 又得到一個_Method對象,最后使用 getCurrentTime() 執(zhí)行這個method對象的call方法,會使用ServerProxy的call方法將請求發(fā)送出去。
客戶端multi-call實現(xiàn)
了解客戶端 single-call 實現(xiàn)后,繼續(xù)查看 multi-call,主要涉及下面3個類:
class _MultiCallMethod:
def __init__(self, call_list, name):
self.__call_list = call_list
self.__name = name
def __getattr__(self, name):
return _MultiCallMethod(self.__call_list, "%s.%s" % (self.__name, name))
def __call__(self, *args):
self.__call_list.append((self.__name, args)) # 添加一個call
class MultiCallIterator:
def __init__(self, results):
self.results = results
def __getitem__(self, i):
item = self.results[i]
if type(item) == type({}):
raise Fault(item['faultCode'], item['faultString'])
elif type(item) == type([]):
return item[0]
else:
raise ValueError("unexpected type in multicall result")
class MultiCall:
def __init__(self, server):
self.__server = server
self.__call_list = []
def __getattr__(self, name):
return _MultiCallMethod(self.__call_list, name)
def __call__(self):
marshalled_list = []
for name, args in self.__call_list:
marshalled_list.append({'methodName' : name, 'params' : args})
# 最后執(zhí)行system.multical
return MultiCallIterator(self.__server.system.multicall(marshalled_list))
代碼行數(shù)比較多,和Method一樣都是使用python的魔法函數(shù):call, getattr和getitem, 可以對比調(diào)用示例體會:
multi = MultiCall(server)
multi.getData()
multi.pow(2,9)
multi.add(1,2)
for response in multi():
print(response)
xmlrpc序列化/反序列化
rpc服務(wù)需要跨網(wǎng)絡(luò)傳輸,server和client之間的數(shù)據(jù)還需要進(jìn)行序列化/反序列化。主要由Marshaller和Unmarshaller兩個類實現(xiàn):
# client.py
class Marshaller:
...
class Unmarshaller:
...
xmlrpc支持下面9種數(shù)據(jù)類型:
array base64 boolean date/time double integer string struct nil
一些數(shù)據(jù)類型的,比如double和nil在python中是不存在的。這2種數(shù)據(jù)的編/解碼如下:
class Marshaller:
def dump_double(self, value, write):
write("<value><double>")
write(repr(value))
write("</double></value>\n")
dispatch[float] = dump_double
def dump_nil (self, value, write):
if not self.allow_none:
raise TypeError("cannot marshal None unless allow_none is enabled")
write("<value><nil/></value>")
dispatch[type(None)] = dump_nil
class Unmarshaller:
def end_double(self, data):
self.append(float(data)) # float
self._value = 0
dispatch["double"] = end_double
dispatch["float"] = end_double
def end_nil (self, data):
self.append(None) # None
self._value = 0
dispatch["nil"] = end_nil
小結(jié)
xmlrpc不考慮tcp協(xié)議的情況下,主要是2層模型,底層是http協(xié)議,上層是xmlrpc協(xié)議。http協(xié)議負(fù)責(zé)網(wǎng)絡(luò)傳輸;xmlrpc協(xié)議負(fù)責(zé)將rpc請求轉(zhuǎn)換成xml數(shù)據(jù),然后再反序列化成請求執(zhí)行。

參考鏈接
https://en.wikipedia.org/wiki/XML-RPC
