<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Python如何異步發(fā)送日志到遠(yuǎn)程服務(wù)器?

          共 11034字,需瀏覽 23分鐘

           ·

          2020-10-24 08:00

          ?△點(diǎn)擊上方Python貓”關(guān)注 ,回復(fù)“1”領(lǐng)取電子書

          作者:360質(zhì)量效能

          來源:360質(zhì)量效能


          背景



          在Python中使用日志最常用的方式就是在控制臺和文件中輸出日志了,logging模塊也很好的提供的相應(yīng) 的類,使用起來也非常方便,但是有時我們可能會有一些需求,如還需要將日志發(fā)送到遠(yuǎn)端,或者直接寫入數(shù) 據(jù)庫,這種需求該如何實(shí)現(xiàn)呢?



          StreamHandler和FileHandler

          首先我們先來寫一套簡單輸出到cmd和文件中的代碼
          #?-*-?coding:?utf-8?-*-
          """
          -------------------------------------------------
          ?File Name:?? loger
          ?Description?:
          ?Author?:????yangyanxing
          ?date:???? 2020/9/23
          -------------------------------------------------
          """

          import?logging
          import?sys
          import?os
          #?初始化logger
          logger?=?logging.getLogger("yyx")
          logger.setLevel(logging.DEBUG)
          #?設(shè)置日志格式
          fmt?=?logging.Formatter('[%(asctime)s]?[%(levelname)s]?%(message)s',?'%Y-%m-%d
          %H:%M:%S'
          )
          #?添加cmd?handler
          cmd_handler?=?logging.StreamHandler(sys.stdout)
          cmd_handler.setLevel(logging.DEBUG)
          cmd_handler.setFormatter(fmt)
          #?添加文件的handler
          logpath?=?os.path.join(os.getcwd(),?'debug.log')
          file_handler?=?logging.FileHandler(logpath)
          file_handler.setLevel(logging.DEBUG)
          file_handler.setFormatter(fmt)
          #?將cmd和file?handler添加到logger中
          logger.addHandler(cmd_handler)
          logger.addHandler(file_handler)
          logger.debug("今天天氣不錯")

          先初始化一個logger, 并且設(shè)置它的日志級別是DEBUG,然后添初始化了 cmd_handler和 file_handler,最后將它們添加到logger中, 運(yùn)行腳本,會在cmd中打印出

          [2020-09-23?10:45:56]?[DEBUG]?今天天氣不錯
          且會寫入到當(dāng)前目錄下的debug.log文件中



          添加HTTPHandler

          如果想要在記錄時將日志發(fā)送到遠(yuǎn)程服務(wù)器上,可以添加一個 HTTPHandler , 在python標(biāo)準(zhǔn)庫logging.handler中,已經(jīng)為我們定義好了很多handler,有些我們可以直接用,本地使用tornado寫一個接收 日志的接口,將接收到的參數(shù)全都打印出來
          #?添加一個httphandler
          import?logging.handlers
          http_handler?=?logging.handlers.HTTPHandler(r"127.0.0.1:1987",?'/api/log/get')
          http_handler.setLevel(logging.DEBUG)
          http_handler.setFormatter(fmt)
          logger.addHandler(http_handler)
          logger.debug("今天天氣不錯")

          結(jié)果在服務(wù)端我們收到了很多信息

          {
          'name':?[b?'yyx'],
          'msg':?[b
          '\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'],
          'args':?[b?'()'],
          'levelname':?[b?'DEBUG'],
          'levelno':?[b?'10'],
          'pathname':?[b?'I:/workplace/yangyanxing/test/loger.py'],
          'filename':?[b?'loger.py'],
          'module':?[b?'loger'],
          'exc_info':?[b?'None'],
          'exc_text':?[b?'None'],
          'stack_info':?[b?'None'],
          'lineno':?[b?'41'],
          'funcName':?[b?''],
          'created':?[b?'1600831054.8881223'],
          'msecs':?[b?'888.1223201751709'],
          'relativeCreated':?[b?'22.99976348876953'],
          'thread':?[b?'14876'],
          'threadName':?[b?'MainThread'],
          'processName':?[b?'MainProcess'],
          'process':?[b?'8648'],
          'message':?[b
          '\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'],
          'asctime':?[b?'2020-09-23?11:17:34']
          }

          可以說是信息非常之多,但是卻并不是我們想要的樣子,我們只是想要類似于

          [2020-09-23?10:45:56][DEBUG]?今天天氣不錯
          這樣的日志

          logging.handlers.HTTPHandler 只是簡單的將日志所有信息發(fā)送給服務(wù)端,至于服務(wù)端要怎么組織內(nèi) 容是由服務(wù)端來完成. 所以我們可以有兩種方法,一種是改服務(wù)端代碼,根據(jù)傳過來的日志信息重新組織一 下日志內(nèi)容, 第二種是我們重新寫一個類,讓它在發(fā)送的時候?qū)⒅匦赂袷交罩緝?nèi)容發(fā)送到服務(wù)端。

          我們采用第二種方法,因?yàn)檫@種方法比較靈活, 服務(wù)端只是用于記錄,發(fā)送什么內(nèi)容應(yīng)該是由客戶端來決定。

          我們需要重新定義一個類,我們可以參考 logging.handlers.HTTPHandler 這個類,重新寫一個httpHandler類

          每個日志類都需要重寫emit方法,記錄日志時真正要執(zhí)行是也就是這個emit方法
          class?CustomHandler(logging.Handler):
          ??def?__init__(self,?host,?uri,?method="POST"):
          ????logging.Handler.__init__(self)
          ????self.url?=?"%s/%s"?%?(host,?uri)
          ????method?=?method.upper()
          ????if?method?not?in?["GET",?"POST"]:
          ??????raise?ValueError("method?must?be?GET?or?POST")
          ????self.method?=?method
          ??def?emit(self,?record):
          ????'''
          ???重寫emit方法,這里主要是為了把初始化時的baseParam添加進(jìn)來
          ???:param?record:
          ???:return:
          ???'''

          ????msg?=?self.format(record)
          ????if?self.method?==?"GET":
          ??????if?(self.url.find("?")?>=?0):
          ????????sep?=?'&'
          ??????else:
          ????????sep?=?'?'
          ??????url?=?self.url?+?"%c%s"?%?(sep,?urllib.parse.urlencode({"log":
          msg}))
          ??????requests.get(url,?timeout=1)
          ????else:
          ??????headers?=?{
          ????????"Content-type":?"application/x-www-form-urlencoded",
          ????????"Content-length":?str(len(msg))
          ?????}
          ??????requests.post(self.url,?data={'log':?msg},?headers=headers,
          timeout=1)
          上面代碼中有一行定義發(fā)送的參數(shù) msg = self.format(record)

          這行代碼表示,將會根據(jù)日志對象設(shè)置的格式返回對應(yīng)的內(nèi)容。

          之后再將內(nèi)容通過requests庫進(jìn)行發(fā)送,無論使用get 還是post方式,服務(wù)端都可以正常的接收到日志
          {'log':?[b'[2020-09-23?11:39:45]?[DEBUG]
          \xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'
          ]}

          將bytes類型轉(zhuǎn)一下就得到了

          [2020-09-23?11:43:50]?[DEBUG]?今天天氣不錯



          異步的發(fā)送遠(yuǎn)程日志

          現(xiàn)在我們考慮一個問題,當(dāng)日志發(fā)送到遠(yuǎn)程服務(wù)器過程中,如果遠(yuǎn)程服務(wù)器處理的很慢,會耗費(fèi)一定的時間, 那么這時記錄日志就會都變慢修改服務(wù)器日志處理類,讓其停頓5秒鐘,模擬長時間的處理流程
          async?def?post(self):
          ??print(self.getParam('log'))
          ??await?asyncio.sleep(5)
          ??self.write({"msg":?'ok'})

          此時我們再打印上面的日志

          logger.debug("今天天氣不錯")
          logger.debug("是風(fēng)和日麗的")

          得到的輸出為

          [2020-09-23?11:47:33]?[DEBUG]?今天天氣不錯
          [2020-09-23?11:47:38]?[DEBUG]?是風(fēng)和日麗的
          我們注意到,它們的時間間隔也是5秒。

          那么現(xiàn)在問題來了,原本只是一個記錄日志,現(xiàn)在卻成了拖累整個腳本的累贅,所以我們需要異步的來 處理遠(yuǎn)程寫日志。


          1

          使用多線程處理

          首先想的是應(yīng)該是用多線程來執(zhí)行發(fā)送日志方法
          def?emit(self,?record):
          ??msg?=?self.format(record)
          ??if?self.method?==?"GET":
          ????if?(self.url.find("?")?>=?0):
          ??????sep?=?'&'
          ????else:
          ??????sep?=?'?'
          ????url?=?self.url?+?"%c%s"?%?(sep,?urllib.parse.urlencode({"log":?msg}))
          ????t?=?threading.Thread(target=requests.get,?args=(url,))
          ????t.start()
          ??else:
          ????headers?=?{
          ??????"Content-type":?"application/x-www-form-urlencoded",
          ??????"Content-length":?str(len(msg))
          ???}
          ????t?=?threading.Thread(target=requests.post,?args=(self.url,),?kwargs=
          {"data":{'log':?msg},
          這種方法是可以達(dá)到不阻塞主目的,但是每打印一條日志就需要開啟一個線程,也是挺浪費(fèi)資源的。我們也 可以使用線程池來處理


          2

          使用線程池處理

          python 的 concurrent.futures 中有ThreadPoolExecutor, ProcessPoolExecutor類,是線程池和進(jìn)程池, 就是在初始化的時候先定義幾個線程,之后讓這些線程來處理相應(yīng)的函數(shù),這樣不用每次都需要新創(chuàng)建線程

          線程池的基本使用
          exector?=?ThreadPoolExecutor(max_workers=1)?#?初始化一個線程池,只有一個線程
          exector.submit(fn,?args,?kwargs)?#?將函數(shù)submit到線程池中
          如果線程池中有n個線程,當(dāng)提交的task數(shù)量大于n時,則多余的task將放到隊列中。
          再次修改上面的emit函數(shù)
          exector?=?ThreadPoolExecutor(max_workers=1)
          def?emit(self,?record):
          ??msg?=?self.format(record)
          ??timeout?=?aiohttp.ClientTimeout(total=6)
          ??if?self.method?==?"GET":
          ????if?(self.url.find("?")?>=?0):
          ??????sep?=?'&'
          ????else:
          ??????sep?=?'?'
          ????url?=?self.url?+?"%c%s"?%?(sep,?urllib.parse.urlencode({"log":?msg}))
          ????exector.submit(requests.get,?url,?timeout=6)
          ??else:
          ????headers?=?{
          ??????"Content-type":?"application/x-www-form-urlencoded",
          ??????"Content-length":?str(len(msg))
          ???}
          ????exector.submit(requests.post,?self.url,?data={'log':?msg},
          headers=headers,?timeout=6)
          這里為什么要只初始化一個只有一個線程的線程池? 因?yàn)檫@樣的話可以保證先進(jìn)隊列里的日志會先被發(fā) 送,如果池子中有多個線程,則不一定保證順序了。

          3

          使用異步aiohttp庫來發(fā)送請求

          上面的CustomHandler類中的emit方法使用的是requests.post來發(fā)送日志,這個requests本身是阻塞運(yùn) 行的,也正上由于它的存在,才使得腳本卡了很長時間,所們我們可以將阻塞運(yùn)行的requests庫替換為異步 的aiohttp來執(zhí)行g(shù)et和post方法, 重寫一個CustomHandler中的emit方法
          class?CustomHandler(logging.Handler):
          ??def?__init__(self,?host,?uri,?method="POST"):
          ????logging.Handler.__init__(self)
          ????self.url?=?"%s/%s"?%?(host,?uri)
          ????method?=?method.upper()
          ????if?method?not?in?["GET",?"POST"]:
          ??????raise?ValueError("method?must?be?GET?or?POST")
          ????self.method?=?method
          ??async?def?emit(self,?record):
          ????msg?=?self.format(record)
          ????timeout?=?aiohttp.ClientTimeout(total=6)
          ????if?self.method?==?"GET":
          ??????if?(self.url.find("?")?>=?0):
          ????????sep?=?'&'
          ??????else:
          ????????sep?=?'?'
          ??????url?=?self.url?+?"%c%s"?%?(sep,?urllib.parse.urlencode({"log":
          msg}))
          ??????async?with?aiohttp.ClientSession(timeout=timeout)?as?session:
          ??????async?with?session.get(self.url)?as?resp:
          ??????????print(await?resp.text())
          ??????else:
          ????????headers?=?{
          ????????"Content-type":?"application/x-www-form-urlencoded",
          ????????"Content-length":?str(len(msg))
          ?????}
          ??????async?with?aiohttp.ClientSession(timeout=timeout,?headers=headers)
          as?session:
          ??????async?with?session.post(self.url,?data={'log':?msg})?as?resp:
          ??????????print(await?resp.text())

          時代碼執(zhí)行崩潰了

          C:\Python37\lib\logging\__init__.py:894:?RuntimeWarning:?coroutine
          'CustomHandler.emit'?was?never?awaited
          self.emit(record)
          RuntimeWarning:?Enable?tracemalloc?to?get?the?object?allocation?traceback
          服務(wù)端也沒有收到發(fā)送日志的請求。

          究其原因是由于emit方法中使用 async with session.post 函數(shù),它需要在一個使用async 修飾的函數(shù) 里執(zhí)行,所以修改emit函數(shù),使用async來修飾,這里emit函數(shù)變成了異步的函數(shù), 返回的是一個 coroutine 對象,要想執(zhí)行coroutine對象,需要使用await, 但是腳本里卻沒有在哪里調(diào)用 await emit() ,所以崩潰信息 中顯示 coroutine 'CustomHandler.emit' was never awaited。

          既然emit方法返回的是一個coroutine對象,那么我們將它放一個loop中執(zhí)行
          async?def?main():
          ??await?logger.debug("今天天氣不錯")
          ??await?logger.debug("是風(fēng)和日麗的")
          loop?=?asyncio.get_event_loop()
          loop.run_until_complete(main())

          執(zhí)行依然報錯

          raise?TypeError('An?asyncio.Future,?a?coroutine?or?an?awaitable?is?'
          意思是需要的是一個coroutine,但是傳進(jìn)來的對象不是。

          這似乎就沒有辦法了,想要使用異步庫來發(fā)送,但是卻沒有可以調(diào)用await的地方。

          解決辦法是有的,我們使用 asyncio.get_event_loop() 獲取一個事件循環(huán)對象, 我們可以在這個對象上注冊很多協(xié)程對象,這樣當(dāng)執(zhí)行事件循環(huán)的時候,就是去執(zhí)行注冊在該事件循環(huán)上的協(xié)程, 我們通過一個小例子來看一下
          import?asyncio
          async?def?test(n):
          ?while?n?>?0:
          ???await?asyncio.sleep(1)
          ???print("test?{}".format(n))
          ???n?-=?1
          ?return?n

          async?def?test2(n):
          ?while?n?>0:
          ???await?asyncio.sleep(1)
          ???print("test2?{}".format(n))
          ???n?-=?1
          def?stoploop(task):
          ?print("執(zhí)行結(jié)束,?task?n?is?{}".format(task.result()))
          ?loop.stop()
          loop?=?asyncio.get_event_loop()
          task?=?loop.create_task(test(5))
          task2?=?loop.create_task(test2(3))
          task.add_done_callback(stoploop)
          task2?=?loop.create_task(test2(3))
          loop.run_forever()
          我們使用 loop = asyncio.get_event_loop() 創(chuàng)建了一個事件循環(huán)對象loop, 并且在loop上創(chuàng)建了兩個task, 并且給task1添加了一個回調(diào)函數(shù),在task1它執(zhí)行結(jié)束以后,將loop停掉。

          注意看上面的代碼,我們并沒有在某處使用await來執(zhí)行協(xié)程,而是通過將協(xié)程注冊到某個事件循環(huán)對象上, 然后調(diào)用該循環(huán)的 run_forever() 函數(shù),從而使該循環(huán)上的協(xié)程對象得以正常的執(zhí)行。

          上面得到的輸出為
          test?5
          test2?3
          test?4
          test2?2
          test?3
          test2?1
          test?2
          test?1
          執(zhí)行結(jié)束,?task?n?is?0

          可以看到,使用事件循環(huán)對象創(chuàng)建的task,在該循環(huán)執(zhí)行run_forever() 以后就可以執(zhí)行了如果不執(zhí)行 loop.run_forever() 函數(shù),則注冊在它上面的協(xié)程也不會執(zhí)行

          loop?=?asyncio.get_event_loop()
          task?=?loop.create_task(test(5))
          task.add_done_callback(stoploop)
          task2?=?loop.create_task(test2(3))
          time.sleep(5)
          #?loop.run_forever()
          上面的代碼將loop.run_forever() 注釋掉,換成time.sleep(5) 停5秒, 這時腳本不會有任何輸出,在停了5秒 以后就中止了,
          回到之前的日志發(fā)送遠(yuǎn)程服務(wù)器的代碼,我們可以使用aiohttp封裝一個發(fā)送數(shù)據(jù)的函數(shù), 然后在emit中將 這個函數(shù)注冊到全局的事件循環(huán)對象loop中,最后再執(zhí)行l(wèi)oop.run_forever()
          loop?=?asyncio.get_event_loop()
          class?CustomHandler(logging.Handler):
          ??def?__init__(self,?host,?uri,?method="POST"):
          ????logging.Handler.__init__(self)
          ????self.url?=?"%s/%s"?%?(host,?uri)
          ????method?=?method.upper()
          ????if?method?not?in?["GET",?"POST"]:
          ??????raise?ValueError("method?must?be?GET?or?POST")
          ????self.method?=?method
          ??#?使用aiohttp封裝發(fā)送數(shù)據(jù)函數(shù)
          ??async?def?submit(self,?data):
          ????timeout?=?aiohttp.ClientTimeout(total=6)
          ????if?self.method?==?"GET":
          ??????if?self.url.find("?")?>=?0:
          ????????sep?=?'&'
          ??????else:
          ????????sep?=?'?'
          ??????url?=?self.url?+?"%c%s"?%?(sep,?urllib.parse.urlencode({"log":
          data}))
          ??????async?with?aiohttp.ClientSession(timeout=timeout)?as?session:
          ????????async?with?session.get(url)?as?resp:
          ??????????print(await?resp.text())
          ????else:
          ??????headers?=?{
          ????????"Content-type":?"application/x-www-form-urlencoded",
          ?????}
          ??????async?with?aiohttp.ClientSession(timeout=timeout,?headers=headers)
          as?session:
          ????????async?with?session.post(self.url,?data={'log':?data})?as?resp:
          ??????????print(await?resp.text())
          ????return?True
          ??def?emit(self,?record):
          ????msg?=?self.format(record)
          ????loop.create_task(self.submit(msg))
          #?添加一個httphandler
          http_handler?=?CustomHandler(r"http://127.0.0.1:1987",?'api/log/get')
          http_handler.setLevel(logging.DEBUG)
          http_handler.setFormatter(fmt)
          logger.addHandler(http_handler)
          logger.debug("今天天氣不錯")
          logger.debug("是風(fēng)和日麗的")
          loop.run_forever()
          時腳本就可以正常的異步執(zhí)行了

          loop.create_task(self.submit(msg)) 也可以使用

          asyncio.ensure_future(self.submit(msg), loop=loop) 來代替,目的都是將協(xié)程對象注冊到事件循環(huán)中。

          但這種方式有一點(diǎn)要注意,loop.run_forever() 將會一直阻塞,所以需要有個地方調(diào)用 loop.stop() 方法. 可以注冊到某個task的回調(diào)中。
          Python貓技術(shù)交流群開放啦!群里既有國內(nèi)一二線大廠在職員工,也有國內(nèi)外高校在讀學(xué)生,既有十多年碼齡的編程老鳥,也有中小學(xué)剛剛?cè)腴T的新人,學(xué)習(xí)氛圍良好!想入群的同學(xué),請在公號內(nèi)回復(fù)『交流群』,獲取貓哥的微信(謝絕廣告黨,非誠勿擾!)~

          近期熱門文章推薦:

          如果只推薦一本 Python 書,我要 Pick 它!

          耗時兩年,我終于出了一本電子書!

          如何建立一個完美的 Python 項(xiàng)目?

          OrderedDict 是如何保證 Key 的插入順序的?

          感謝創(chuàng)作者的好文
          瀏覽 63
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  天天色天天日天天射 | 亚洲免费无码高清 | 亚洲A片在线观看 | 日本一级黄色 | 精品777777 |