基于 Python 探針完成調(diào)用庫的數(shù)據(jù)提取

劇照:《時光之輪》
作者:so1n;來源:博海拾貝diary
前記
最近在完善公司的監(jiān)控系統(tǒng), 發(fā)現(xiàn)在項目運行時經(jīng)常會出現(xiàn)一些運行時的問題, 這些問題往往不是一個子服務引發(fā)的問題, 而可能是某個環(huán)節(jié)出現(xiàn)了問題, 這時候就需要引入APM系統(tǒng)。
在收集APM數(shù)據(jù)時發(fā)現(xiàn)在Python生態(tài)中針對web框架都有完善的APM中間件用于接口統(tǒng)計與監(jiān)控, 但是第三方調(diào)用庫相關(guān)的APM實現(xiàn)都比較少(幾乎沒有), 同時這些庫大多數(shù)也都沒提供一些鉤子實現(xiàn)。這就需要自己去封裝一些庫, 為這些庫實現(xiàn)一套調(diào)用過程的數(shù)據(jù)提供邏輯。
本文是以Python的aiomysql庫為例,闡述如何基于Python的探針完成調(diào)用庫的調(diào)用過程統(tǒng)計與監(jiān)控的封裝。
注: 監(jiān)控的形式的agent有很多種,如
Prometheus,Zabbix,?Graphite和Opentracing他們的數(shù)據(jù)源有很大的不同,但是他們都是基于元數(shù)據(jù)封裝成自己的源數(shù)據(jù),然后發(fā)送到對應的服務,所以本文只介紹如何提取元數(shù)據(jù),剩下的如何發(fā)送需要自己按照特定的監(jiān)控系統(tǒng)去實現(xiàn)。注:這里以aiomysql庫來做示例,提取數(shù)據(jù)的方法應該用統(tǒng)一的dbapi2, 本文只闡述如何簡單的實現(xiàn)。
1.簡單粗暴的方法--對mysql庫進行封裝
要統(tǒng)計一個執(zhí)行過程, 就需要知道這個執(zhí)行過程的開始位置和結(jié)束位置, 所以最簡單粗暴的方法就是基于要調(diào)用的方法進行封裝,在框架調(diào)用MySQL庫和MySQL庫中間實現(xiàn)一個中間層, 在中間層完成耗時統(tǒng)計,如:
#?偽代碼
def?my_execute(conn,?sql,?param):
????#?針對MySql庫的統(tǒng)計封裝組件
????with?MyTracer(conn,?sql,?param):
????????#?以下為正常使用MySql庫的代碼
????????with?conn.cursor?as?cursor:
????????????cursor.execute(sql,?param)
????????????...
看樣子實現(xiàn)起來非常不錯, 而且更改非常方便, 但由于是在最頂層的API上進行修改, 其實是非常不靈活的, 同時在cursor.execute里會進行一些預操作, 如把sql和param進行拼接, 調(diào)用nextset清除當前游標的數(shù)據(jù)等等。我們最后拿到的數(shù)據(jù)如時間耗時也是不準確的, 同時也沒辦法得到一些詳細的元數(shù)據(jù), 如錯誤碼等等.
如果要拿到最直接有用的數(shù)據(jù),就只能去改源代碼, 然后再調(diào)用源代碼了, 但是如果每個庫都需要改源代碼才能統(tǒng)計, 那也太麻煩了, 好在Python也提供了一些類似探針的接口, 可以通過探針把庫的源碼進行替換完成我們的代碼.
2.Python的探針
在Python中可以通過sys.meta_path來實現(xiàn)import hook的功能, 當執(zhí)行 import 相關(guān)操作時, 會根據(jù)sys.meta_path定義的對象對import相關(guān)庫進行更改.
sys.meta_path中的對象需要實現(xiàn)一個find_module方法, 這個find_module方法返回None或一個實現(xiàn)了load_module方法的對象, 我們可以通過這個對象, 針對一些庫在import時, 把相關(guān)的方法進行替換, 簡單用法如下,通過hooktime.sleep讓他在sleep的時候能打印消耗的時間.
github源碼存儲:https://github.com/so1n/example/blob/master/example_python/example_python/python_hook/hook_demo
import?importlib
import?sys
from?functools?import?wraps
def?func_wrapper(func):
????"""這里通過一個裝飾器來達到貍貓換太子和獲取數(shù)據(jù)的效果"""
????@wraps(func)
????def?wrapper(*args,?**kwargs):
????????#?記錄開始時間
????????start?=?time.time()
????????result?=?func(*args,?**kwargs)
????????#?統(tǒng)計消耗時間
????????end?=?time.time()
????????print(f"speed?time:{end?-?start}")
????????return?result
????return?wrapper
class?MetaPathFinder:
????def?find_module(self,?fullname,?path=None):
????????#?執(zhí)行時可以看出來在import哪些模塊
????????print(f'find?module:{path}:{fullname}')
????????return?MetaPathLoader()
class?MetaPathLoader:
????def?load_module(self,?fullname):
????????#?import的模塊都會存放在sys.modules里面,?通過判斷可以減少重復import
????????if?fullname?in?sys.modules:
????????????return?sys.modules[fullname]
????????#?防止遞歸調(diào)用
????????finder?=?sys.meta_path.pop(0)
????????#?導入?module
????????module?=?importlib.import_module(fullname)
????????if?fullname?==?'time':
????????????#?替換函數(shù)
????????????module.sleep?=?func_wrapper(module.sleep)
????????sys.meta_path.insert(0,?finder)
????????return?module
sys.meta_path.insert(0,?MetaPathFinder())
if?__name__?==?'__main__':
????import?time
????time.sleep(1)
#?輸出示例:
#?find?module:datetime
#?find?module:time
#?load?module:time
#?find?module:math
#?find?module:_datetime
#?speed?time:1.00073385238647468
3.制作探針模塊
了解完了主要流程后, 可以開始制作自己的探針模塊了, 由于示例只涉及到aiomysql模塊, 那么在MetaPathFinder.find_module中需要只對aiomysql模塊進行處理, 其他的先忽略. 然后我們需要確定我們要把aiomysql的哪個功能給替換, 從業(yè)務上來說, 一般情況下我們只要cursor.execute,?cursor.fetchone,?cursor.fetchall,?cursor.executemany這幾個主要的操作,所以需要深入cursor的源碼:https://github.com/aio-libs/aiomysql/blob/master/aiomysql/cursors.py, 看看如何去更改代碼, 后者重載哪個函數(shù).
先cursor.execute的源碼(cursor.executemanay也類似), 發(fā)現(xiàn)會先調(diào)用self.nextset的方法, 把上個請求的數(shù)據(jù)先拿完, 再合并sql語句, 最后通過self._query進行查詢:
async?def?execute(self,?query,?args=None):
????"""Executes?the?given?operation
????Executes?the?given?operation?substituting?any?markers?with
????the?given?parameters.
????For?example,?getting?all?rows?where?id?is?5:
????????cursor.execute("SELECT?*?FROM?t1?WHERE?id?=?%s",?(5,))
????:param?query:?``str``?sql?statement
????:param?args:?``tuple``?or?``list``?of?arguments?for?sql?query
????:returns:?``int``,?number?of?rows?that?has?been?produced?of?affected
????"""
????conn?=?self._get_db()
????while?(await?self.nextset()):
????????pass
????if?args?is?not?None:
????????query?=?query?%?self._escape_args(args,?conn)
????await?self._query(query)
????self._executed?=?query
????if?self._echo:
????????logger.info(query)
????????logger.info("%r",?args)
????return?self._rowcount
再看cursor.fetchone的源碼(cursor.fetchall也類似), 發(fā)現(xiàn)其實是從緩存中獲取數(shù)據(jù), 這些數(shù)據(jù)在執(zhí)行cursor.execute中就已經(jīng)獲取了:
def?fetchone(self):
????"""Fetch?the?next?row?"""
????self._check_executed()
????fut?=?self._loop.create_future()
????if?self._rows?is?None?or?self._rownumber?>=?len(self._rows):
????????fut.set_result(None)
????????return?fut
????result?=?self._rows[self._rownumber]
????self._rownumber?+=?1
????fut?=?self._loop.create_future()
????fut.set_result(result)
????return?fut
綜合上面的分析, 我們只要對核心的方法self._query進行重載即可拿到我們要的數(shù)據(jù), 從源碼中我們可以知道, 我們能獲取到傳入self._query的self和sql參數(shù), 根據(jù)self又能獲取到查詢的結(jié)果, 同時我們通過裝飾器能獲取到運行的時間, 要的數(shù)據(jù)基本都到齊了, 按照思路修改后的代碼如下
import?importlib
import?time
import?sys
from?functools?import?wraps
from?typing?import?cast,?Any,?Callable,?Optional,?Tuple,?TYPE_CHECKING
from?types?import?ModuleType
if?TYPE_CHECKING:
????import?aiomysql
def?func_wrapper(func:?Callable):
????@wraps(func)
????async?def?wrapper(*args,?**kwargs)?->?Any:
????????start:?float?=?time.time()
????????func_result:?Any?=?await?func(*args,?**kwargs)
????????end:?float?=?time.time()
????????#?根據(jù)_query可以知道,?第一格參數(shù)是self,?第二個參數(shù)是sql
????????self:?aiomysql.Cursor?=?args[0]
????????sql:?str?=?args[1]
????????#?通過self,我們可以拿到其他的數(shù)據(jù)
????????db:?str?=?self._connection.db
????????user:?str?=?self._connection.user
????????host:?str?=?self._connection.host
????????port:?str?=?self._connection.port
????????execute_result:?Tuple[Tuple]?=?self._rows
????????#?可以根據(jù)自己定義的agent把數(shù)據(jù)發(fā)送到指定的平臺,?然后我們就可以在平臺上看到對應的數(shù)據(jù)或進行監(jiān)控了,?
????????#?這里只是打印一部分數(shù)據(jù)出來
????????print({
????????????"sql":?sql,
????????????"db":?db,
????????????"user":?user,
????????????"host":?host,
????????????"port":?port,
????????????"result":?execute_result,
????????????"speed?time":?end?-?start
????????})
????????return?func_result
????return?cast(Callable,?wrapper)
class?MetaPathFinder:
????@staticmethod
????def?find_module(fullname:?str,?path:?Optional[str]?=?None)?->?Optional["MetaPathLoader"]:
????????if?fullname?==?'aiomysql':
????????????#?只有aiomysql才進行hook
????????????return?MetaPathLoader()
????????else:
????????????return?None
class?MetaPathLoader:
????@staticmethod
????def?load_module(fullname:?str):
????????if?fullname?in?sys.modules:
????????????return?sys.modules[fullname]
????????#?防止遞歸調(diào)用
????????finder:?"MetaPathFinder"?=?sys.meta_path.pop(0)
????????#?導入?module
????????module:?ModuleType?=?importlib.import_module(fullname)
????????#?針對_query進行hook
????????module.Cursor._query?=?func_wrapper(module.Cursor._query)
????????sys.meta_path.insert(0,?finder)
????????return?module
async?def?test_mysql()?->?None:
????import?aiomysql
????pool:?aiomysql.Pool?=?await?aiomysql.create_pool(
????????host='127.0.0.1',?port=3306,?user='root',?password='123123',?db='mysql'
????)
????async?with?pool.acquire()?as?conn:
????????async?with?conn.cursor()?as?cur:
????????????await?cur.execute("SELECT?42;")
????????????(r,)?=?await?cur.fetchone()
????????????assert?r?==?42
????pool.close()
????await?pool.wait_closed()
if?__name__?==?'__main__':
????sys.meta_path.insert(0,?MetaPathFinder())
????import?asyncio
????asyncio.run(test_mysql())
#?輸出示例:
#?可以看出sql語句與我們輸入的一樣,?db,?user,?host,?port等參數(shù)也是,?還能知道執(zhí)行的結(jié)果和運行時間
#?{'sql':?'SELECT?42;',?'db':?'mysql',?'user':?'root',?'host':?'127.0.0.1',?'port':?3306,?'result':?((42,),),?'speed?time':?0.00045609474182128906}
這個例子看來很不錯, 但是需要在調(diào)用的入口處顯式調(diào)用該邏輯, 通常一個項目可能有幾個入口, 每個入口都顯示調(diào)用該邏輯會非常麻煩, 而且必須先調(diào)用我們的hook邏輯后才能import, 這樣就得訂好引入規(guī)范, 不然就可能出現(xiàn)部分地方hook不成功, 如果能把引入hook這個邏輯安排在解析器啟動后馬上執(zhí)行, 就可以完美地解決這個問題了.?
查閱了一翻資料后發(fā)現(xiàn),python解釋器初始化的時候會自動import PYTHONPATH下存在的sitecustomize和usercustomize模塊, 我們只要創(chuàng)建該模塊, 并在模塊里面寫入我們的 替換函數(shù)即可。
具體結(jié)構(gòu)如下,也可以參考github存儲:https://github.com/so1n/example/blob/master/example_python/example_python/python_hook/hook_by_sys/
.
├──?__init__.py
├──?hook_aiomysql.py
├──?sitecustomize.py
└──?test_auto_hook.py
hook_aiomysql.py是我們制作探針的代碼為例子, 而sitecustomize.py存放的代碼如下, 非常簡單, 就是引入我們的探針代碼, 并插入到sys.meta_path:
import?sys
from?hook_aiomysql?import?MetaPathFinder
sys.meta_path.insert(0,?MetaPathFinder())
test_auto_hook.py則是測試代碼:
import?asyncio
from?hook_aiomysql?import?test_mysql
asyncio.run(test_mysql())
接下來只要設置PYTHONPATH并運行我們的代碼即可(如果是項目的話一般交由superisor啟動,則可以在配置文件中設置好PYTHONPATH):
(.venv)????python_hook?git:(master)???export?PYTHONPATH=.??????
(.venv)????python_hook?git:(master)???python?test_auto_hook.py?
{'sql':?'SELECT?42;',?'db':?'mysql',?'user':?'root',?'host':?'127.0.0.1',?'port':?3306,?'result':?((42,),),?'speed?time':?0.000213623046875}
4.直接替換方法
可以看到上面的方法很好的運行了, 而且可以很方便的嵌入到我們的項目中, 但是依賴與sitecustomize.py文件很難讓他抽離成一個第三方的庫, 如果要抽離成第三方的庫就得考慮看看有沒有其他的方法。在上面介紹MetaPathLoader時說到了sys.module, 在里面通過sys.modules來減少重復引入:
class?MetaPathLoader:
????def?load_module(self,?fullname):
????????#?import的模塊都會存放在sys.modules里面,?通過判斷可以減少重復import
????????if?fullname?in?sys.modules:
????????????return?sys.modules[fullname]
????????#?防止遞歸調(diào)用
????????finder?=?sys.meta_path.pop(0)
????????#?導入?module
????????module?=?importlib.import_module(fullname)
????????if?fullname?==?'time':
????????????#?替換函數(shù)
????????????module.sleep?=?func_wrapper(module.sleep)
????????sys.meta_path.insert(0,?finder)
????????return?module
這個減少重復引入的原理是, 每次引入一個模塊后, 他就會存放在sys.modules, 如果是重復引入, 就會直接刷新成最新引入的模塊。上面之所以會考慮到減少重復import是因為我們不會在程序運行時升級第三方庫的依賴。利用到我們可以不考慮重復引入同名不同實現(xiàn)的模塊, 以及sys.modules會緩存引入模塊的特點, 我們可以把上面的邏輯簡化成引入模塊->替換當前模塊方法為我們修改的hook方法。第一次接觸這個方法是從opentracing-python-instrumentation:https://github.com/uber-common/opentracing-python-instrumentation學的, 不過他夾帶著其他的封裝, 所以我這里進行了簡化處理 github代碼倉:https://github.com/so1n/example/tree/master/example_python/example_python/python_hook/hook_by_global:
import?time
from?functools?import?wraps
from?typing?import?Any,?Callable,?Tuple,?cast
import?aiomysql
def?func_wrapper(func:?Callable):
????"""和上面一樣的封裝函數(shù),?這里簡單略過"""
????
#?判斷是否hook過
_IS_HOOK:?bool?=?False
#?存放原來的_query
_query:?Callable?=?aiomysql.Cursor._query
#?hook函數(shù)
def?install_hook()?->?None:
????_IS_HOOK?=?False
????if?_IS_HOOK:
????????return
????aiomysql.Cursor._query?=?func_wrapper(aiomysql.Cursor._query)
????_IS_HOOK?=?True
#?還原到原來的函數(shù)方法
def?reset_hook()?->?None:
????aiomysql.Cursor._query?=?_query
????_IS_HOOK?=?False
代碼簡單明了,接下來跑一跑剛才的測試:
import?asyncio
import?aiomysql
from?demo?import?install_hook,?reset_hook
async?def?test_mysql()?->?None:
????pool:?aiomysql.Pool?=?await?aiomysql.create_pool(
????????host='127.0.0.1',?port=3306,?user='root',?password='',?db='mysql'
????)
????async?with?pool.acquire()?as?conn:
????????async?with?conn.cursor()?as?cur:
????????????await?cur.execute("SELECT?42;")
????????????(r,)?=?await?cur.fetchone()
????????????assert?r?==?42
????pool.close()
????await?pool.wait_closed()
print("install?hook")
install_hook()
asyncio.run(test_mysql())
print("reset?hook")
reset_hook()
asyncio.run(test_mysql())
print("end")
通過測試輸出可以發(fā)現(xiàn)我們的邏輯的正確的, install hook后能出現(xiàn)我們提取的元信息, 而reset后則不會打印原信息
install?hook
{'sql':?'SELECT?42;',?'db':?'mysql',?'user':?'root',?'host':?'127.0.0.1',?'port':?3306,?'result':?((42,),),?'speed?time':?0.000347137451171875}
reset?hook
end
5.總結(jié)
得益于Python動態(tài)語言的特性, 我們可以很容易的為第三方庫實現(xiàn)鉤子方法,上面說的兩種方法中, 第二種方法非常簡單, 但在自己項目中最好還是采用第一種方法, 因為Python是通過一行一行代碼進行掃描執(zhí)行的, 第二種方法只能放在入口代碼中, 并且要在被hook的對象實例化之前執(zhí)行, 不然就會實現(xiàn)hook失敗的現(xiàn)象, 而第一種方法除了麻煩外, 基本上能躲避所有坑。

還不過癮?試試它們
