Python如何異步發(fā)送日志到遠(yuǎn)程服務(wù)器?

作者:360質(zhì)量效能
來源:360質(zhì)量效能
背景
在Python中使用日志最常用的方式就是在控制臺和文件中輸出日志了,logging模塊也很好的提供的相應(yīng) 的類,使用起來也非常方便,但是有時我們可能會有一些需求,如還需要將日志發(fā)送到遠(yuǎn)端,或者直接寫入數(shù) 據(jù)庫,這種需求該如何實(shí)現(xiàn)呢?
StreamHandler和FileHandler
#?-*-?coding:?utf-8?-*-
"""
-------------------------------------------------
?File Name:?? loger
?Description?:
?Author?:????yangyanxing
?date:???? 2020/9/23
-------------------------------------------------
"""
import?logging
import?sys
import?os
#?初始化logger
logger?=?logging.getLogger("yyx")
logger.setLevel(logging.DEBUG)
#?設(shè)置日志格式
fmt?=?logging.Formatter('[%(asctime)s]?[%(levelname)s]?%(message)s',?'%Y-%m-%d
%H:%M:%S')
#?添加cmd?handler
cmd_handler?=?logging.StreamHandler(sys.stdout)
cmd_handler.setLevel(logging.DEBUG)
cmd_handler.setFormatter(fmt)
#?添加文件的handler
logpath?=?os.path.join(os.getcwd(),?'debug.log')
file_handler?=?logging.FileHandler(logpath)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(fmt)
#?將cmd和file?handler添加到logger中
logger.addHandler(cmd_handler)
logger.addHandler(file_handler)
logger.debug("今天天氣不錯")
先初始化一個logger, 并且設(shè)置它的日志級別是DEBUG,然后添初始化了 cmd_handler和 file_handler,最后將它們添加到logger中, 運(yùn)行腳本,會在cmd中打印出
[2020-09-23?10:45:56]?[DEBUG]?今天天氣不錯
添加HTTPHandler
#?添加一個httphandler
import?logging.handlers
http_handler?=?logging.handlers.HTTPHandler(r"127.0.0.1:1987",?'/api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)
logger.debug("今天天氣不錯")
結(jié)果在服務(wù)端我們收到了很多信息
{
'name':?[b?'yyx'],
'msg':?[b
'\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'],
'args':?[b?'()'],
'levelname':?[b?'DEBUG'],
'levelno':?[b?'10'],
'pathname':?[b?'I:/workplace/yangyanxing/test/loger.py'],
'filename':?[b?'loger.py'],
'module':?[b?'loger'],
'exc_info':?[b?'None'],
'exc_text':?[b?'None'],
'stack_info':?[b?'None'],
'lineno':?[b?'41'],
'funcName':?[b?'' ],
'created':?[b?'1600831054.8881223'],
'msecs':?[b?'888.1223201751709'],
'relativeCreated':?[b?'22.99976348876953'],
'thread':?[b?'14876'],
'threadName':?[b?'MainThread'],
'processName':?[b?'MainProcess'],
'process':?[b?'8648'],
'message':?[b
'\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'],
'asctime':?[b?'2020-09-23?11:17:34']
}
可以說是信息非常之多,但是卻并不是我們想要的樣子,我們只是想要類似于
[2020-09-23?10:45:56][DEBUG]?今天天氣不錯
logging.handlers.HTTPHandler 只是簡單的將日志所有信息發(fā)送給服務(wù)端,至于服務(wù)端要怎么組織內(nèi) 容是由服務(wù)端來完成. 所以我們可以有兩種方法,一種是改服務(wù)端代碼,根據(jù)傳過來的日志信息重新組織一 下日志內(nèi)容, 第二種是我們重新寫一個類,讓它在發(fā)送的時候?qū)⒅匦赂袷交罩緝?nèi)容發(fā)送到服務(wù)端。
我們采用第二種方法,因?yàn)檫@種方法比較靈活, 服務(wù)端只是用于記錄,發(fā)送什么內(nèi)容應(yīng)該是由客戶端來決定。
我們需要重新定義一個類,我們可以參考 logging.handlers.HTTPHandler 這個類,重新寫一個httpHandler類
class?CustomHandler(logging.Handler):
??def?__init__(self,?host,?uri,?method="POST"):
????logging.Handler.__init__(self)
????self.url?=?"%s/%s"?%?(host,?uri)
????method?=?method.upper()
????if?method?not?in?["GET",?"POST"]:
??????raise?ValueError("method?must?be?GET?or?POST")
????self.method?=?method
??def?emit(self,?record):
????'''
???重寫emit方法,這里主要是為了把初始化時的baseParam添加進(jìn)來
???:param?record:
???:return:
???'''
????msg?=?self.format(record)
????if?self.method?==?"GET":
??????if?(self.url.find("?")?>=?0):
????????sep?=?'&'
??????else:
????????sep?=?'?'
??????url?=?self.url?+?"%c%s"?%?(sep,?urllib.parse.urlencode({"log":
msg}))
??????requests.get(url,?timeout=1)
????else:
??????headers?=?{
????????"Content-type":?"application/x-www-form-urlencoded",
????????"Content-length":?str(len(msg))
?????}
??????requests.post(self.url,?data={'log':?msg},?headers=headers,
timeout=1)
這行代碼表示,將會根據(jù)日志對象設(shè)置的格式返回對應(yīng)的內(nèi)容。
{'log':?[b'[2020-09-23?11:39:45]?[DEBUG]
\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99']}
將bytes類型轉(zhuǎn)一下就得到了
[2020-09-23?11:43:50]?[DEBUG]?今天天氣不錯
異步的發(fā)送遠(yuǎn)程日志
async?def?post(self):
??print(self.getParam('log'))
??await?asyncio.sleep(5)
??self.write({"msg":?'ok'})
此時我們再打印上面的日志
logger.debug("今天天氣不錯")
logger.debug("是風(fēng)和日麗的")
得到的輸出為
[2020-09-23?11:47:33]?[DEBUG]?今天天氣不錯
[2020-09-23?11:47:38]?[DEBUG]?是風(fēng)和日麗的
那么現(xiàn)在問題來了,原本只是一個記錄日志,現(xiàn)在卻成了拖累整個腳本的累贅,所以我們需要異步的來 處理遠(yuǎn)程寫日志。
1
使用多線程處理
def?emit(self,?record):
??msg?=?self.format(record)
??if?self.method?==?"GET":
????if?(self.url.find("?")?>=?0):
??????sep?=?'&'
????else:
??????sep?=?'?'
????url?=?self.url?+?"%c%s"?%?(sep,?urllib.parse.urlencode({"log":?msg}))
????t?=?threading.Thread(target=requests.get,?args=(url,))
????t.start()
??else:
????headers?=?{
??????"Content-type":?"application/x-www-form-urlencoded",
??????"Content-length":?str(len(msg))
???}
????t?=?threading.Thread(target=requests.post,?args=(self.url,),?kwargs=
{"data":{'log':?msg},
2
使用線程池處理
python 的 concurrent.futures 中有ThreadPoolExecutor, ProcessPoolExecutor類,是線程池和進(jìn)程池, 就是在初始化的時候先定義幾個線程,之后讓這些線程來處理相應(yīng)的函數(shù),這樣不用每次都需要新創(chuàng)建線程
exector?=?ThreadPoolExecutor(max_workers=1)?#?初始化一個線程池,只有一個線程
exector.submit(fn,?args,?kwargs)?#?將函數(shù)submit到線程池中
exector?=?ThreadPoolExecutor(max_workers=1)
def?emit(self,?record):
??msg?=?self.format(record)
??timeout?=?aiohttp.ClientTimeout(total=6)
??if?self.method?==?"GET":
????if?(self.url.find("?")?>=?0):
??????sep?=?'&'
????else:
??????sep?=?'?'
????url?=?self.url?+?"%c%s"?%?(sep,?urllib.parse.urlencode({"log":?msg}))
????exector.submit(requests.get,?url,?timeout=6)
??else:
????headers?=?{
??????"Content-type":?"application/x-www-form-urlencoded",
??????"Content-length":?str(len(msg))
???}
????exector.submit(requests.post,?self.url,?data={'log':?msg},
headers=headers,?timeout=6)
3
使用異步aiohttp庫來發(fā)送請求
class?CustomHandler(logging.Handler):
??def?__init__(self,?host,?uri,?method="POST"):
????logging.Handler.__init__(self)
????self.url?=?"%s/%s"?%?(host,?uri)
????method?=?method.upper()
????if?method?not?in?["GET",?"POST"]:
??????raise?ValueError("method?must?be?GET?or?POST")
????self.method?=?method
??async?def?emit(self,?record):
????msg?=?self.format(record)
????timeout?=?aiohttp.ClientTimeout(total=6)
????if?self.method?==?"GET":
??????if?(self.url.find("?")?>=?0):
????????sep?=?'&'
??????else:
????????sep?=?'?'
??????url?=?self.url?+?"%c%s"?%?(sep,?urllib.parse.urlencode({"log":
msg}))
??????async?with?aiohttp.ClientSession(timeout=timeout)?as?session:
??????async?with?session.get(self.url)?as?resp:
??????????print(await?resp.text())
??????else:
????????headers?=?{
????????"Content-type":?"application/x-www-form-urlencoded",
????????"Content-length":?str(len(msg))
?????}
??????async?with?aiohttp.ClientSession(timeout=timeout,?headers=headers)
as?session:
??????async?with?session.post(self.url,?data={'log':?msg})?as?resp:
??????????print(await?resp.text())
這時代碼執(zhí)行崩潰了
C:\Python37\lib\logging\__init__.py:894:?RuntimeWarning:?coroutine
'CustomHandler.emit'?was?never?awaited
self.emit(record)
RuntimeWarning:?Enable?tracemalloc?to?get?the?object?allocation?traceback
究其原因是由于emit方法中使用 async with session.post 函數(shù),它需要在一個使用async 修飾的函數(shù) 里執(zhí)行,所以修改emit函數(shù),使用async來修飾,這里emit函數(shù)變成了異步的函數(shù), 返回的是一個 coroutine 對象,要想執(zhí)行coroutine對象,需要使用await, 但是腳本里卻沒有在哪里調(diào)用 await emit() ,所以崩潰信息 中顯示 coroutine 'CustomHandler.emit' was never awaited。
async?def?main():
??await?logger.debug("今天天氣不錯")
??await?logger.debug("是風(fēng)和日麗的")
loop?=?asyncio.get_event_loop()
loop.run_until_complete(main())
執(zhí)行依然報錯
raise?TypeError('An?asyncio.Future,?a?coroutine?or?an?awaitable?is?'
這似乎就沒有辦法了,想要使用異步庫來發(fā)送,但是卻沒有可以調(diào)用await的地方。
import?asyncio
async?def?test(n):
?while?n?>?0:
???await?asyncio.sleep(1)
???print("test?{}".format(n))
???n?-=?1
?return?n
async?def?test2(n):
?while?n?>0:
???await?asyncio.sleep(1)
???print("test2?{}".format(n))
???n?-=?1
def?stoploop(task):
?print("執(zhí)行結(jié)束,?task?n?is?{}".format(task.result()))
?loop.stop()
loop?=?asyncio.get_event_loop()
task?=?loop.create_task(test(5))
task2?=?loop.create_task(test2(3))
task.add_done_callback(stoploop)
task2?=?loop.create_task(test2(3))
loop.run_forever()
注意看上面的代碼,我們并沒有在某處使用await來執(zhí)行協(xié)程,而是通過將協(xié)程注冊到某個事件循環(huán)對象上, 然后調(diào)用該循環(huán)的 run_forever() 函數(shù),從而使該循環(huán)上的協(xié)程對象得以正常的執(zhí)行。
test?5
test2?3
test?4
test2?2
test?3
test2?1
test?2
test?1
執(zhí)行結(jié)束,?task?n?is?0
可以看到,使用事件循環(huán)對象創(chuàng)建的task,在該循環(huán)執(zhí)行run_forever() 以后就可以執(zhí)行了如果不執(zhí)行 loop.run_forever() 函數(shù),則注冊在它上面的協(xié)程也不會執(zhí)行
loop?=?asyncio.get_event_loop()
task?=?loop.create_task(test(5))
task.add_done_callback(stoploop)
task2?=?loop.create_task(test2(3))
time.sleep(5)
#?loop.run_forever()
loop?=?asyncio.get_event_loop()
class?CustomHandler(logging.Handler):
??def?__init__(self,?host,?uri,?method="POST"):
????logging.Handler.__init__(self)
????self.url?=?"%s/%s"?%?(host,?uri)
????method?=?method.upper()
????if?method?not?in?["GET",?"POST"]:
??????raise?ValueError("method?must?be?GET?or?POST")
????self.method?=?method
??#?使用aiohttp封裝發(fā)送數(shù)據(jù)函數(shù)
??async?def?submit(self,?data):
????timeout?=?aiohttp.ClientTimeout(total=6)
????if?self.method?==?"GET":
??????if?self.url.find("?")?>=?0:
????????sep?=?'&'
??????else:
????????sep?=?'?'
??????url?=?self.url?+?"%c%s"?%?(sep,?urllib.parse.urlencode({"log":
data}))
??????async?with?aiohttp.ClientSession(timeout=timeout)?as?session:
????????async?with?session.get(url)?as?resp:
??????????print(await?resp.text())
????else:
??????headers?=?{
????????"Content-type":?"application/x-www-form-urlencoded",
?????}
??????async?with?aiohttp.ClientSession(timeout=timeout,?headers=headers)
as?session:
????????async?with?session.post(self.url,?data={'log':?data})?as?resp:
??????????print(await?resp.text())
????return?True
??def?emit(self,?record):
????msg?=?self.format(record)
????loop.create_task(self.submit(msg))
#?添加一個httphandler
http_handler?=?CustomHandler(r"http://127.0.0.1:1987",?'api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)
logger.debug("今天天氣不錯")
logger.debug("是風(fēng)和日麗的")
loop.run_forever()
loop.create_task(self.submit(msg)) 也可以使用
asyncio.ensure_future(self.submit(msg), loop=loop) 來代替,目的都是將協(xié)程對象注冊到事件循環(huán)中。

近期熱門文章推薦:

