一文打盡 Python-web 開發(fā)的 signal 機(jī)制
signal在flask/django中都是很重要的解耦手段。flask的signal依賴blinker實(shí)現(xiàn),django的signal也很類似。blinker庫是純python實(shí)現(xiàn)的代碼簡(jiǎn)單,功能強(qiáng)大的signal庫。本文我們從blinker開始,一起了解python-web開發(fā)的signal機(jī)制:
- blinker的api
- blinker-signal的實(shí)現(xiàn)
- flask-signal的實(shí)現(xiàn)
- django-signal的實(shí)現(xiàn)
- weakref介紹
- 小結(jié)
- 小技巧
blinker簡(jiǎn)介
blinker源碼使用 1.4 版本, 項(xiàng)目結(jié)構(gòu)如下:
| 文件 | 描述 |
|---|---|
| base.py | 核心邏輯 |
| _saferef.py | 安全引用相關(guān)邏輯 |
| _utilities.py | 工具類 |
blinker的API
blinker的api使用示例:
from?blinker?import?signal
def?subscriber1(sender):
????print("1?Got?a?signal?sent?by?%r"?%?sender)
def?subscriber2(sender):
????print("2?Got?a?signal?sent?by?%r"?%?sender)
ready?=?signal('ready')
print(ready)
ready.connect(subscriber1)
ready.connect(subscriber2)
ready.send("go")
示例的日志輸出:
<blinker.base.NamedSignal?object?at?0x7f93a805ad00;?'ready'>
1?Got?a?signal?sent?by?'go'
2?Got?a?signal?sent?by?'go'
可以看到signal是發(fā)布/訂閱模式。或者換個(gè)更常見的說法,事件中心:
ready = signal('ready')創(chuàng)建名為ready的事件中心ready.connect(subscriber1)給ready事件中心添加事件監(jiān)聽器ready.send("go")向ready事件中心派發(fā)事件,這樣事件監(jiān)聽器會(huì)收到事件并進(jìn)行處理
signal的實(shí)現(xiàn)
signal默認(rèn)單例,提供開箱即用的API:
class?NamedSignal(Signal):
????"""A?named?generic?notification?emitter."""
????def?__init__(self,?name,?doc=None):
????????Signal.__init__(self,?doc)
????????self.name?=?name
class?Namespace(dict):
????def?signal(self,?name,?doc=None):
????????try:
????????????return?self[name]
????????except?KeyError:
????????????return?self.setdefault(name,?NamedSignal(name,?doc))
signal?=?Namespace().signal
需要說明一下的是,signal的單例是和name綁定的。同一個(gè)名稱得到同一個(gè)NamedSignal對(duì)象,不同名稱得到的NamedSignal對(duì)象不一樣。
NamedSignal的父類Signal的構(gòu)造方法,包括1)事件接收器字典receivers:以事件接收器id為key和事件接收器為value;2)接收器ID-發(fā)送器ID的字典:以接收器ID為key和發(fā)送器ID集合為value;3)和2類似的字典,只不過是反向的,key為發(fā)送器ID,value為接收器集合。
ANY?=?symbol('ANY')
class?Signal(object):
????ANY?=?ANY
????
????def?__init__(self,?doc=None)
????????self.receivers?=?{}
????????self._by_receiver?=?defaultdict(set)
????????self._by_sender?=?defaultdict(set)
????????...
Signal的connect函數(shù)添加消息接收器,可以看到sender和receiver是多對(duì)多的關(guān)系。
def?connect(self,?receiver,?sender=ANY,?weak=True):
????receiver_id?=?hashable_identity(receiver)
????receiver_ref?=?receiver
????sender_id?=?ANY_ID
????self.receivers.setdefault(receiver_id,?receiver_ref)
????self._by_sender[sender_id].add(receiver_id)
????self._by_receiver[receiver_id].add(sender_id)
????del?receiver_ref
????return?receiver
Signal的send函數(shù)將消息發(fā)送給所有關(guān)注該sender的接收器:
def?send(self,?*sender,?**kwargs):
????sender?=?sender[0]
????#?循環(huán)執(zhí)行所有的receiver
????return?[(receiver,?receiver(sender,?**kwargs))
????????????for?receiver?in?self.receivers_for(sender)]
def?receivers_for(self,?sender):
????sender_id?=?hashable_identity(sender)
????#?根據(jù)sender_id找receiver_id
????if?sender_id?in?self._by_sender:
????????#?2個(gè)set的合集?
????????ids?=?(self._by_sender[ANY_ID]?|
???????????????self._by_sender[sender_id])
????else:
????????ids?=?self._by_sender[ANY_ID].copy()
????for?receiver_id?in?ids:
????????receiver?=?self.receivers.get(receiver_id)
????????if?receiver?is?None:
????????????continue
????????#?迭代器
????????yield?receiver
有始有終,Signal使用disconnect函數(shù)注銷消息的接收器:
def?disconnect(self,?receiver,?sender=ANY):
????sender_id?=?ANY_ID
????receiver_id?=?hashable_identity(receiver)
????self._disconnect(receiver_id,?sender_id)
def?_disconnect(self,?receiver_id,?sender_id):
????if?sender_id?==?ANY_ID:
????????if?self._by_receiver.pop(receiver_id,?False):
????????????for?bucket?in?self._by_sender.values():
????????????????bucket.discard(receiver_id)
????????self.receivers.pop(receiver_id,?None)
????else:
????????self._by_sender[sender_id].discard(receiver_id)
????????self._by_receiver[receiver_id].discard(sender_id)
為了便于理解signal機(jī)制,我們暫時(shí)忽略了weakref相關(guān)的代碼,稍后再進(jìn)行介紹。
flask-signal的實(shí)現(xiàn)
flask-signal依賴blinker的實(shí)現(xiàn):
#?flask.signals.py
from?blinker?import?Namespace
_signals?=?Namespace()
template_rendered?=?_signals.signal("template-rendered")
before_render_template?=?_signals.signal("before-render-template")
request_started?=?_signals.signal("request-started")
request_finished?=?_signals.signal("request-finished")
request_tearing_down?=?_signals.signal("request-tearing-down")
got_request_exception?=?_signals.signal("got-request-exception")
appcontext_tearing_down?=?_signals.signal("appcontext-tearing-down")
appcontext_pushed?=?_signals.signal("appcontext-pushed")
appcontext_popped?=?_signals.signal("appcontext-popped")
message_flashed?=?_signals.signal("message-flashed")
從上面代碼可以看到flask使用blinker預(yù)制了多個(gè)signal。以request_started為例, flask在處理request時(shí)候會(huì)向request_started派發(fā)事件:
#?flask.app.py
from?.signals?import?request_started
def?full_dispatch_request(self):
????...
????request_started.send(self)
????...
我們可以在自己的代碼中,這樣注冊(cè)事件監(jiān)聽:
def?log_request(sender,?**extra):
????sender.logger.debug('Request?context?is?set?up')
from?flask?import?request_started
request_started.connect(log_request,?app)
這樣就可以很方便的使用signal獲取到flask在各個(gè)階段的數(shù)據(jù)。
django-signal的實(shí)現(xiàn)
django-signal雖然是獨(dú)立實(shí)現(xiàn),但是模式和blinker非常類似。Signal構(gòu)造函數(shù)創(chuàng)建了一個(gè)對(duì)象,充當(dāng)事件中心。
#?django/dispatch/dispatcher.py
def?_make_id(target):
????if?hasattr(target,?'__func__'):
????????return?(id(target.__self__),?id(target.__func__))
????return?id(target)
NONE_ID?=?_make_id(None)
#?A?marker?for?caching
NO_RECEIVERS?=?object()
class?Signal:
????def?__init__(self,?providing_args=None,?use_caching=False):
????????"""
????????Create?a?new?signal.
????????"""
????????self.receivers?=?[]
????????self.lock?=?threading.Lock()
????????self.use_caching?=?use_caching
????????self.sender_receivers_cache?=?weakref.WeakKeyDictionary()?if?use_caching?else?{}
????????self._dead_receivers?=?False
connect核心功能就是為事件監(jiān)聽器構(gòu)建唯一標(biāo)識(shí)(receiver_id,sender_id),然后加入receivers數(shù)組。
def?connect(self,?receiver,?sender=None,?weak=True,?dispatch_uid=None):
????from?django.conf?import?settings
????lookup_key?=?(_make_id(receiver),?_make_id(sender))
????ref?=?weakref.ref
????receiver_object?=?receiver
????receiver?=?ref(receiver)
????with?self.lock:
????????if?not?any(r_key?==?lookup_key?for?r_key,?_?in?self.receivers):
????????????self.receivers.append((lookup_key,?receiver))
send函數(shù)和blinker的send類似:
def?send(self,?sender,?**named):
????return?[
????????(receiver,?receiver(signal=self,?sender=sender,?**named))
????????for?receiver?in?self._live_receivers(sender)
????]
def?_live_receivers(self,?sender):
????with?self.lock:
????????senderkey?=?_make_id(sender)
????????receivers?=?[]
????????for?(receiverkey,?r_senderkey),?receiver?in?self.receivers:
????????????if?r_senderkey?==?NONE_ID?or?r_senderkey?==?senderkey:
????????????????receivers.append(receiver)
????...
????non_weak_receivers?=?[]
????for?receiver?in?receivers:
????????non_weak_receivers.append(receiver)
????return?non_weak_receivers
django-signal額外提供了一個(gè)receiver裝飾器,方便業(yè)務(wù)使用:
def?receiver(signal,?**kwargs):
????
????def?_decorator(func):
????????if?isinstance(signal,?(list,?tuple)):
????????????for?s?in?signal:
????????????????s.connect(func,?**kwargs)
????????else:
????????????signal.connect(func,?**kwargs)
????????return?func
????return?_decorator
django的model中額外包裝了ModelSignal類并且預(yù)制了一些signal:
class?ModelSignal(Signal):
????def?_lazy_method(self,?method,?apps,?receiver,?sender,?**kwargs):
????????from?django.db.models.options?import?Options
????????#?This?partial?takes?a?single?optional?argument?named?"sender".
????????partial_method?=?partial(method,?receiver,?**kwargs)
????????if?isinstance(sender,?str):
????????????apps?=?apps?or?Options.default_apps
????????????apps.lazy_model_operation(partial_method,?make_model_tuple(sender))
????????else:
????????????return?partial_method(sender)
????def?connect(self,?receiver,?sender=None,?weak=True,?dispatch_uid=None,?apps=None):
????????self._lazy_method(
????????????super().connect,?apps,?receiver,?sender,
????????????weak=weak,?dispatch_uid=dispatch_uid,
????????)
...
#?定義模型各個(gè)階段的signal
pre_init?=?ModelSignal(use_caching=True)
post_init?=?ModelSignal(use_caching=True)
pre_save?=?ModelSignal(use_caching=True)
post_save?=?ModelSignal(use_caching=True)
pre_delete?=?ModelSignal(use_caching=True)
post_delete?=?ModelSignal(use_caching=True)
m2m_changed?=?ModelSignal(use_caching=True)
pre_migrate?=?Signal()
signal的使用方式在receiver裝飾器的注釋中有介紹:
"""
A?decorator?for?connecting?receivers?to?signals.?Used?by?passing?in?the
signal?(or?list?of?signals)?and?keyword?arguments?to?connect::
????@receiver(post_save,?sender=MyModel)
????def?signal_receiver(sender,?**kwargs):
????????...
????@receiver([post_save,?post_delete],?sender=MyModel)
????def?signals_receiver(sender,?**kwargs):
????????...
"""
這樣利用signal機(jī)制,可以對(duì)MyModel進(jìn)行一些額外的邏輯處理,又避免了代碼的硬耦合。
weakref 介紹
了解了signal的各種實(shí)現(xiàn)和使用后,我們?cè)倩仡^學(xué)習(xí)blinker-signal中另外一個(gè)環(huán)節(jié)weakref。weakref可以顯著提高signal的性能, 請(qǐng)看下面示例:
def?test_weak_value_dict(cache):
????c_list?=?[]
????class?C:
????????def?method(self):
????????????return?("method?called!",?id(self))
????c1?=?C()
????c2?=?C()
????c3?=?C()
????c_list.append(c1)
????c_list.append(c2)
????c_list.append(c3)
????del?c1,?c2,?c3
????def?do_cache(cache,?name,?target):
????????cache[name]?=?target
????for?idx,?target?in?enumerate(c_list):
????????do_cache(cache,?idx,?target)
????for?k,?v?in?cache.items():
????????print("before",?k,?v.method())
????del?c_list
????gc.collect()
????for?x,?y?in?cache.items():
????????print("after",?x,?y.method())
test_weak_value_dict({})
print("=="?*?10)
test_weak_value_dict(weakref.WeakValueDictionary())
在test_weak_value_dict函數(shù)中,創(chuàng)建了3個(gè)對(duì)象,將對(duì)象放到一個(gè)列表和cache中,完成后再刪除對(duì)象和對(duì)象列表并進(jìn)行g(shù)c。如果cache的實(shí)現(xiàn)是set,那么gc后cache中任然存在3個(gè)對(duì)象,也就是對(duì)象不會(huì)回收;如果是使用WeakValueDictionary實(shí)現(xiàn)的cache,則部分對(duì)象進(jìn)行了回收。在一個(gè)事件中心,如果監(jiān)聽函數(shù)取消后卻無法釋放回收,內(nèi)存會(huì)持續(xù)增長(zhǎng)。
before?0?('method?called!',?140431874960640)
before?1?('method?called!',?140431874959440)
before?2?('method?called!',?140431874959968)
after?0?('method?called!',?140431874960640)
after?1?('method?called!',?140431874959440)
after?2?('method?called!',?140431874959968)
====================
before?0?('method?called!',?140431875860416)
before?1?('method?called!',?140431875860128)
before?2?('method?called!',?140431876163136)
after?2?('method?called!',?140431876163136)signal 小結(jié)
到這里我們可以知道blinker/flask/django的signal都是單純的python消息中心,和我們之前在gunicorn中使用的系統(tǒng) signal 完全不一樣。消息中心,可以用來進(jìn)行業(yè)務(wù)邏輯的解耦,一般就包括三步:
- 注冊(cè)監(jiān)聽器
- 派發(fā)事件
- 注銷監(jiān)聽器
小技巧
blinker中提供了一種 單例模式 的實(shí)現(xiàn)參考,我把它叫做 分組單例 , 組名相同會(huì)得到同一個(gè)對(duì)象實(shí)例:
class?_symbol(object):
????def?__init__(self,?group):
????????"""Construct?a?new?group?symbol."""
????????#?原文是name,我把它換成了group,感覺這樣更容易理解一些
????????self.__group__?=?self.group?=?group
????def?__reduce__(self):
????????return?symbol,?(self.group,)
????def?__repr__(self):
????????return?self.group
_symbol.__group__?=?'symbol'
class?symbol(object):
????"""A?constant?symbol.
????# group相同的symbol是同一個(gè)對(duì)象
????>>>?symbol('foo')?is?symbol('foo')
????True
????>>>?symbol('foo')
????foo
????"""
????symbols?=?{}
????def?__new__(cls,?group):
????????try:
????????????return?cls.symbols[group]
????????except?KeyError:
????????????return?cls.symbols.setdefault(group,?_symbol(group))
ANY?=?symbol('ANY')??#?單例
參考鏈接:
- https://pythonhosted.org/blinker/
- https://stackify.com/python-garbage-collection/
- https://www.cnblogs.com/TM0831/p/10599716.html
- https://www.geeksforgeeks.org/weak-references-in-python/
- https://pymotw.com/2/weakref/


實(shí)戰(zhàn):Flask + Vue 生成漂亮的詞云


Python實(shí)戰(zhàn) | 基于 Flask 部署 Keras 深度學(xué)習(xí)模型
