<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          基于 Python 探針完成調(diào)用庫的數(shù)據(jù)提取

          共 11449字,需瀏覽 23分鐘

           ·

          2021-12-10 14:02

          劇照:《時光之輪》

          作者:so1n;來源:博海拾貝diary

          前記

          最近在完善公司的監(jiān)控系統(tǒng), 發(fā)現(xiàn)在項目運行時經(jīng)常會出現(xiàn)一些運行時的問題, 這些問題往往不是一個子服務引發(fā)的問題, 而可能是某個環(huán)節(jié)出現(xiàn)了問題, 這時候就需要引入APM系統(tǒng)。

          在收集APM數(shù)據(jù)時發(fā)現(xiàn)在Python生態(tài)中針對web框架都有完善的APM中間件用于接口統(tǒng)計與監(jiān)控, 但是第三方調(diào)用庫相關(guān)的APM實現(xiàn)都比較少(幾乎沒有), 同時這些庫大多數(shù)也都沒提供一些鉤子實現(xiàn)。這就需要自己去封裝一些庫, 為這些庫實現(xiàn)一套調(diào)用過程的數(shù)據(jù)提供邏輯。

          本文是以Pythonaiomysql庫為例,闡述如何基于Python的探針完成調(diào)用庫的調(diào)用過程統(tǒng)計與監(jiān)控的封裝。

          注: 監(jiān)控的形式的agent有很多種,如Prometheus,Zabbix,?GraphiteOpentracing他們的數(shù)據(jù)源有很大的不同,但是他們都是基于元數(shù)據(jù)封裝成自己的源數(shù)據(jù),然后發(fā)送到對應的服務,所以本文只介紹如何提取元數(shù)據(jù),剩下的如何發(fā)送需要自己按照特定的監(jiān)控系統(tǒng)去實現(xiàn)。注:這里以aiomysql庫來做示例,提取數(shù)據(jù)的方法應該用統(tǒng)一的dbapi2, 本文只闡述如何簡單的實現(xiàn)。

          原文地址:https://so1n.me/2020/11/18/%E5%9F%BA%E4%BA%8EPython%E6%8E%A2%E9%92%88%E5%AE%8C%E6%88%90%E8%B0%83%E7%94%A8%E5%BA%93%E7%9A%84%E6%95%B0%E6%8D%AE%E6%8F%90%E5%8F%96

          1.簡單粗暴的方法--對mysql庫進行封裝

          要統(tǒng)計一個執(zhí)行過程, 就需要知道這個執(zhí)行過程的開始位置和結(jié)束位置, 所以最簡單粗暴的方法就是基于要調(diào)用的方法進行封裝,在框架調(diào)用MySQL庫和MySQL庫中間實現(xiàn)一個中間層, 在中間層完成耗時統(tǒng)計,如:

          #?偽代碼

          def?my_execute(conn,?sql,?param):
          ????#?針對MySql庫的統(tǒng)計封裝組件
          ????with?MyTracer(conn,?sql,?param):
          ????????#?以下為正常使用MySql庫的代碼
          ????????with?conn.cursor?as?cursor:
          ????????????cursor.execute(sql,?param)
          ????????????...

          看樣子實現(xiàn)起來非常不錯, 而且更改非常方便, 但由于是在最頂層的API上進行修改, 其實是非常不靈活的, 同時在cursor.execute里會進行一些預操作, 如把sql和param進行拼接, 調(diào)用nextset清除當前游標的數(shù)據(jù)等等。我們最后拿到的數(shù)據(jù)如時間耗時也是不準確的, 同時也沒辦法得到一些詳細的元數(shù)據(jù), 如錯誤碼等等.

          如果要拿到最直接有用的數(shù)據(jù),就只能去改源代碼, 然后再調(diào)用源代碼了, 但是如果每個庫都需要改源代碼才能統(tǒng)計, 那也太麻煩了, 好在Python也提供了一些類似探針的接口, 可以通過探針把庫的源碼進行替換完成我們的代碼.

          2.Python的探針

          Python中可以通過sys.meta_path來實現(xiàn)import hook的功能, 當執(zhí)行 import 相關(guān)操作時, 會根據(jù)sys.meta_path定義的對象對import相關(guān)庫進行更改.

          sys.meta_path中的對象需要實現(xiàn)一個find_module方法, 這個find_module方法返回None或一個實現(xiàn)了load_module方法的對象, 我們可以通過這個對象, 針對一些庫在import時, 把相關(guān)的方法進行替換, 簡單用法如下,通過hooktime.sleep讓他在sleep的時候能打印消耗的時間.

          github源碼存儲:https://github.com/so1n/example/blob/master/example_python/example_python/python_hook/hook_demo

          import?importlib
          import?sys
          from?functools?import?wraps


          def?func_wrapper(func):
          ????"""這里通過一個裝飾器來達到貍貓換太子和獲取數(shù)據(jù)的效果"""
          ????@wraps(func)
          ????def?wrapper(*args,?**kwargs):
          ????????#?記錄開始時間
          ????????start?=?time.time()
          ????????result?=?func(*args,?**kwargs)
          ????????#?統(tǒng)計消耗時間
          ????????end?=?time.time()
          ????????print(f"speed?time:{end?-?start}")
          ????????return?result
          ????return?wrapper


          class?MetaPathFinder:

          ????def?find_module(self,?fullname,?path=None):
          ????????#?執(zhí)行時可以看出來在import哪些模塊
          ????????print(f'find?module:{path}:{fullname}')
          ????????return?MetaPathLoader()


          class?MetaPathLoader:

          ????def?load_module(self,?fullname):
          ????????#?import的模塊都會存放在sys.modules里面,?通過判斷可以減少重復import
          ????????if?fullname?in?sys.modules:
          ????????????return?sys.modules[fullname]
          ????????#?防止遞歸調(diào)用
          ????????finder?=?sys.meta_path.pop(0)
          ????????#?導入?module
          ????????module?=?importlib.import_module(fullname)
          ????????if?fullname?==?'time':
          ????????????#?替換函數(shù)
          ????????????module.sleep?=?func_wrapper(module.sleep)
          ????????sys.meta_path.insert(0,?finder)
          ????????return?module


          sys.meta_path.insert(0,?MetaPathFinder())


          if?__name__?==?'__main__':
          ????import?time
          ????time.sleep(1)


          #?輸出示例:
          #?find?module:datetime
          #?find?module:time
          #?load?module:time
          #?find?module:math
          #?find?module:_datetime
          #?speed?time:1.00073385238647468

          3.制作探針模塊

          了解完了主要流程后, 可以開始制作自己的探針模塊了, 由于示例只涉及到aiomysql模塊, 那么在MetaPathFinder.find_module中需要只對aiomysql模塊進行處理, 其他的先忽略. 然后我們需要確定我們要把aiomysql的哪個功能給替換, 從業(yè)務上來說, 一般情況下我們只要cursor.execute,?cursor.fetchone,?cursor.fetchall,?cursor.executemany這幾個主要的操作,所以需要深入cursor的源碼:https://github.com/aio-libs/aiomysql/blob/master/aiomysql/cursors.py, 看看如何去更改代碼, 后者重載哪個函數(shù).

          cursor.execute的源碼(cursor.executemanay也類似), 發(fā)現(xiàn)會先調(diào)用self.nextset的方法, 把上個請求的數(shù)據(jù)先拿完, 再合并sql語句, 最后通過self._query進行查詢:

          async?def?execute(self,?query,?args=None):
          ????"""Executes?the?given?operation
          ????Executes?the?given?operation?substituting?any?markers?with
          ????the?given?parameters.
          ????For?example,?getting?all?rows?where?id?is?5:
          ????????cursor.execute("SELECT?*?FROM?t1?WHERE?id?=?%s",?(5,))
          ????:param?query:?``str``?sql?statement
          ????:param?args:?``tuple``?or?``list``?of?arguments?for?sql?query
          ????:returns:?``int``,?number?of?rows?that?has?been?produced?of?affected
          ????"""

          ????conn?=?self._get_db()

          ????while?(await?self.nextset()):
          ????????pass

          ????if?args?is?not?None:
          ????????query?=?query?%?self._escape_args(args,?conn)

          ????await?self._query(query)
          ????self._executed?=?query
          ????if?self._echo:
          ????????logger.info(query)
          ????????logger.info("%r",?args)
          ????return?self._rowcount

          再看cursor.fetchone的源碼(cursor.fetchall也類似), 發(fā)現(xiàn)其實是從緩存中獲取數(shù)據(jù), 這些數(shù)據(jù)在執(zhí)行cursor.execute中就已經(jīng)獲取了:

          def?fetchone(self):
          ????"""Fetch?the?next?row?"""
          ????self._check_executed()
          ????fut?=?self._loop.create_future()

          ????if?self._rows?is?None?or?self._rownumber?>=?len(self._rows):
          ????????fut.set_result(None)
          ????????return?fut
          ????result?=?self._rows[self._rownumber]
          ????self._rownumber?+=?1

          ????fut?=?self._loop.create_future()
          ????fut.set_result(result)
          ????return?fut

          綜合上面的分析, 我們只要對核心的方法self._query進行重載即可拿到我們要的數(shù)據(jù), 從源碼中我們可以知道, 我們能獲取到傳入self._queryselfsql參數(shù), 根據(jù)self又能獲取到查詢的結(jié)果, 同時我們通過裝飾器能獲取到運行的時間, 要的數(shù)據(jù)基本都到齊了, 按照思路修改后的代碼如下

          import?importlib
          import?time
          import?sys
          from?functools?import?wraps

          from?typing?import?cast,?Any,?Callable,?Optional,?Tuple,?TYPE_CHECKING
          from?types?import?ModuleType
          if?TYPE_CHECKING:
          ????import?aiomysql


          def?func_wrapper(func:?Callable):
          ????@wraps(func)
          ????async?def?wrapper(*args,?**kwargs)?->?Any:
          ????????start:?float?=?time.time()
          ????????func_result:?Any?=?await?func(*args,?**kwargs)
          ????????end:?float?=?time.time()

          ????????#?根據(jù)_query可以知道,?第一格參數(shù)是self,?第二個參數(shù)是sql
          ????????self:?aiomysql.Cursor?=?args[0]
          ????????sql:?str?=?args[1]
          ????????#?通過self,我們可以拿到其他的數(shù)據(jù)
          ????????db:?str?=?self._connection.db
          ????????user:?str?=?self._connection.user
          ????????host:?str?=?self._connection.host
          ????????port:?str?=?self._connection.port
          ????????execute_result:?Tuple[Tuple]?=?self._rows
          ????????#?可以根據(jù)自己定義的agent把數(shù)據(jù)發(fā)送到指定的平臺,?然后我們就可以在平臺上看到對應的數(shù)據(jù)或進行監(jiān)控了,?
          ????????#?這里只是打印一部分數(shù)據(jù)出來
          ????????print({
          ????????????"sql":?sql,
          ????????????"db":?db,
          ????????????"user":?user,
          ????????????"host":?host,
          ????????????"port":?port,
          ????????????"result":?execute_result,
          ????????????"speed?time":?end?-?start
          ????????})
          ????????return?func_result
          ????return?cast(Callable,?wrapper)


          class?MetaPathFinder:

          ????@staticmethod
          ????def?find_module(fullname:?str,?path:?Optional[str]?=?None)?->?Optional["MetaPathLoader"]:
          ????????if?fullname?==?'aiomysql':
          ????????????#?只有aiomysql才進行hook
          ????????????return?MetaPathLoader()
          ????????else:
          ????????????return?None


          class?MetaPathLoader:

          ????@staticmethod
          ????def?load_module(fullname:?str):
          ????????if?fullname?in?sys.modules:
          ????????????return?sys.modules[fullname]
          ????????#?防止遞歸調(diào)用
          ????????finder:?"MetaPathFinder"?=?sys.meta_path.pop(0)
          ????????#?導入?module
          ????????module:?ModuleType?=?importlib.import_module(fullname)
          ????????#?針對_query進行hook
          ????????module.Cursor._query?=?func_wrapper(module.Cursor._query)
          ????????sys.meta_path.insert(0,?finder)
          ????????return?module


          async?def?test_mysql()?->?None:
          ????import?aiomysql
          ????pool:?aiomysql.Pool?=?await?aiomysql.create_pool(
          ????????host='127.0.0.1',?port=3306,?user='root',?password='123123',?db='mysql'
          ????)
          ????async?with?pool.acquire()?as?conn:
          ????????async?with?conn.cursor()?as?cur:
          ????????????await?cur.execute("SELECT?42;")
          ????????????(r,)?=?await?cur.fetchone()
          ????????????assert?r?==?42
          ????pool.close()
          ????await?pool.wait_closed()

          if?__name__?==?'__main__':
          ????sys.meta_path.insert(0,?MetaPathFinder())
          ????import?asyncio

          ????asyncio.run(test_mysql())

          #?輸出示例:
          #?可以看出sql語句與我們輸入的一樣,?db,?user,?host,?port等參數(shù)也是,?還能知道執(zhí)行的結(jié)果和運行時間
          #?{'sql':?'SELECT?42;',?'db':?'mysql',?'user':?'root',?'host':?'127.0.0.1',?'port':?3306,?'result':?((42,),),?'speed?time':?0.00045609474182128906}

          這個例子看來很不錯, 但是需要在調(diào)用的入口處顯式調(diào)用該邏輯, 通常一個項目可能有幾個入口, 每個入口都顯示調(diào)用該邏輯會非常麻煩, 而且必須先調(diào)用我們的hook邏輯后才能import, 這樣就得訂好引入規(guī)范, 不然就可能出現(xiàn)部分地方hook不成功, 如果能把引入hook這個邏輯安排在解析器啟動后馬上執(zhí)行, 就可以完美地解決這個問題了.?

          查閱了一翻資料后發(fā)現(xiàn),python解釋器初始化的時候會自動import PYTHONPATH下存在的sitecustomizeusercustomize模塊, 我們只要創(chuàng)建該模塊, 并在模塊里面寫入我們的 替換函數(shù)即可。

          具體結(jié)構(gòu)如下,也可以參考github存儲:https://github.com/so1n/example/blob/master/example_python/example_python/python_hook/hook_by_sys/

          .
          ├──?__init__.py
          ├──?hook_aiomysql.py
          ├──?sitecustomize.py
          └──?test_auto_hook.py

          hook_aiomysql.py是我們制作探針的代碼為例子, 而sitecustomize.py存放的代碼如下, 非常簡單, 就是引入我們的探針代碼, 并插入到sys.meta_path

          import?sys
          from?hook_aiomysql?import?MetaPathFinder

          sys.meta_path.insert(0,?MetaPathFinder())

          test_auto_hook.py則是測試代碼:

          import?asyncio
          from?hook_aiomysql?import?test_mysql


          asyncio.run(test_mysql())

          接下來只要設置PYTHONPATH并運行我們的代碼即可(如果是項目的話一般交由superisor啟動,則可以在配置文件中設置好PYTHONPATH):

          (.venv)????python_hook?git:(master)???export?PYTHONPATH=.??????
          (.venv)????python_hook?git:(master)???python?test_auto_hook.py?
          {'sql':?'SELECT?42;',?'db':?'mysql',?'user':?'root',?'host':?'127.0.0.1',?'port':?3306,?'result':?((42,),),?'speed?time':?0.000213623046875}

          4.直接替換方法

          可以看到上面的方法很好的運行了, 而且可以很方便的嵌入到我們的項目中, 但是依賴與sitecustomize.py文件很難讓他抽離成一個第三方的庫, 如果要抽離成第三方的庫就得考慮看看有沒有其他的方法。在上面介紹MetaPathLoader時說到了sys.module, 在里面通過sys.modules來減少重復引入:

          class?MetaPathLoader:

          ????def?load_module(self,?fullname):
          ????????#?import的模塊都會存放在sys.modules里面,?通過判斷可以減少重復import
          ????????if?fullname?in?sys.modules:
          ????????????return?sys.modules[fullname]
          ????????#?防止遞歸調(diào)用
          ????????finder?=?sys.meta_path.pop(0)
          ????????#?導入?module
          ????????module?=?importlib.import_module(fullname)
          ????????if?fullname?==?'time':
          ????????????#?替換函數(shù)
          ????????????module.sleep?=?func_wrapper(module.sleep)
          ????????sys.meta_path.insert(0,?finder)
          ????????return?module

          這個減少重復引入的原理是, 每次引入一個模塊后, 他就會存放在sys.modules, 如果是重復引入, 就會直接刷新成最新引入的模塊。上面之所以會考慮到減少重復import是因為我們不會在程序運行時升級第三方庫的依賴。利用到我們可以不考慮重復引入同名不同實現(xiàn)的模塊, 以及sys.modules會緩存引入模塊的特點, 我們可以把上面的邏輯簡化成引入模塊->替換當前模塊方法為我們修改的hook方法。第一次接觸這個方法是從opentracing-python-instrumentation:https://github.com/uber-common/opentracing-python-instrumentation學的, 不過他夾帶著其他的封裝, 所以我這里進行了簡化處理 github代碼倉:https://github.com/so1n/example/tree/master/example_python/example_python/python_hook/hook_by_global:

          import?time
          from?functools?import?wraps
          from?typing?import?Any,?Callable,?Tuple,?cast

          import?aiomysql


          def?func_wrapper(func:?Callable):
          ????"""和上面一樣的封裝函數(shù),?這里簡單略過"""
          ????
          #?判斷是否hook過
          _IS_HOOK:?bool?=?False
          #?存放原來的_query
          _query:?Callable?=?aiomysql.Cursor._query


          #?hook函數(shù)
          def?install_hook()?->?None:
          ????_IS_HOOK?=?False
          ????if?_IS_HOOK:
          ????????return
          ????aiomysql.Cursor._query?=?func_wrapper(aiomysql.Cursor._query)
          ????_IS_HOOK?=?True


          #?還原到原來的函數(shù)方法
          def?reset_hook()?->?None:
          ????aiomysql.Cursor._query?=?_query
          ????_IS_HOOK?=?False

          代碼簡單明了,接下來跑一跑剛才的測試:

          import?asyncio
          import?aiomysql

          from?demo?import?install_hook,?reset_hook


          async?def?test_mysql()?->?None:
          ????pool:?aiomysql.Pool?=?await?aiomysql.create_pool(
          ????????host='127.0.0.1',?port=3306,?user='root',?password='',?db='mysql'
          ????)
          ????async?with?pool.acquire()?as?conn:
          ????????async?with?conn.cursor()?as?cur:
          ????????????await?cur.execute("SELECT?42;")
          ????????????(r,)?=?await?cur.fetchone()
          ????????????assert?r?==?42
          ????pool.close()
          ????await?pool.wait_closed()

          print("install?hook")
          install_hook()
          asyncio.run(test_mysql())
          print("reset?hook")
          reset_hook()
          asyncio.run(test_mysql())
          print("end")

          通過測試輸出可以發(fā)現(xiàn)我們的邏輯的正確的, install hook后能出現(xiàn)我們提取的元信息, 而reset后則不會打印原信息

          install?hook
          {'sql':?'SELECT?42;',?'db':?'mysql',?'user':?'root',?'host':?'127.0.0.1',?'port':?3306,?'result':?((42,),),?'speed?time':?0.000347137451171875}
          reset?hook
          end

          5.總結(jié)

          得益于Python動態(tài)語言的特性, 我們可以很容易的為第三方庫實現(xiàn)鉤子方法,上面說的兩種方法中, 第二種方法非常簡單, 但在自己項目中最好還是采用第一種方法, 因為Python是通過一行一行代碼進行掃描執(zhí)行的, 第二種方法只能放在入口代碼中, 并且要在被hook的對象實例化之前執(zhí)行, 不然就會實現(xiàn)hook失敗的現(xiàn)象, 而第一種方法除了麻煩外, 基本上能躲避所有坑。

          Python貓技術(shù)交流群開放啦!群里既有國內(nèi)一二線大廠在職員工,也有國內(nèi)外高校在讀學生,既有十多年碼齡的編程老鳥,也有中小學剛剛?cè)腴T的新人,學習氛圍良好!想入群的同學,請在公號內(nèi)回復『交流群』,獲取貓哥的微信(謝絕廣告黨,非誠勿擾?。?/span>~


          還不過癮?試試它們




          如何簡化大量的 if…elif…else 代碼?

          重寫 500 Lines or Less 項目:流水線調(diào)度問題

          Python 列表解析式竟然支持異步?

          Python 性能測試工具 Locust 極簡入門

          解開 Python 中 self 的四個秘密!

          為什么說 Python 內(nèi)置函數(shù)并不是萬能的?


          如果你覺得本文有幫助
          請慷慨分享點贊,感謝啦
          瀏覽 27
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  一本道无码字慕 | 欧美一区二区黄色 | 色婷婷色综合 | 操鼻视频素材网站直接 | 日皮视频免费网站 |