.. versionadded:: 0.6
从 Flask 0.6 开始,Flask 中集成支持信号。这种支持是由优秀的 blinker 库提供,并且当它不可用时会优雅地退回。
信号是什么?信号通过在核心框架的其它地方或 Flask 扩展的动作发生时的发送通知来帮助你解耦应用。 简而言之,信号允许特定的发送端通知订阅者发生了什么。
Flask 提供了一些信号且其它的扩展提供更多的信号。另外请注意信号是用于通知订阅者,而不应该鼓励订阅者修改数据。 你会注意到信号看起来和一些内置的装饰器做同样的事情(如: :data:`~flask.request_started` 同 :meth:`~flask.Flask.before_request` 是非常类似)。然而在工作方式上它们存在不同。譬如核心的 :meth:`~flask.Flask.before_request` 处理程序以特定的顺 序执行,并且可以在返回响应之前放弃请求。相比之下,所有的信号处理器执行的顺序没有定义,并且不修改任何数据。
信号对比处理器最大的优势就是你可以在一瞬间安全地订阅它们。例如,这些临时的订阅对测试很有帮助。 比如说你想要知道哪个模板被作为请求的一部分渲染:信号允许你完全地了解这些。
你可以使用信号的 :meth:`~blinker.base.Signal.connect` 方法来订阅信号。第一个参数是信号发出时要调用的函数,第二个参数是可选的,用于确定信号的发送者。 你可以用 :meth:`~blinker.base.Signal.disconnect` 方法来退订信号。
对于所有的核心 Flask 信号,发送者都是发出信号的应用。当你订阅一个信号,请 确保也提供一个发送者,除非你确实想监听全部应用的信号。这在你开发一个扩展的时候尤其正确。
比如这里有一个用于在单元测试中找出哪个模板被渲染和传入模板的变量的助手上下文管理器:
from flask import template_rendered from contextlib import contextmanager @contextmanager def captured_templates(app): recorded = [] def record(sender, template, context, **extra): recorded.append((template, context)) template_rendered.connect(record, app) try: yield recorded finally: template_rendered.disconnect(record, app)
现在可以轻松地与测试客户端配对:
with captured_templates(app) as templates: rv = app.test_client().get('/') assert rv.status_code == 200 assert len(templates) == 1 template, context = templates[0] assert template.name == 'index.html' assert len(context['items']) == 10
确保订阅使用了一个额外的 **extra
参数,这样当 Flask 对信号引入新参数时你的调用不会失败。
代码中,从 with 块的应用 app 中流出的渲染的所有模板现在会被记录到 templates 变量。无论何时模板被渲染,模板对象的上下文中添加上它。
此外,也有一个方便的助手方法(:meth:`~blinker.base.Signal.connected_to`) ,它允许你临时地用它自己的上下文管理器把函数订阅到信号。因为上下文管理器的返回值不能按那种方法给定, 则需要把列表作为参数传入:
from flask import template_rendered def captured_templates(app, recorded, **extra): def record(sender, template, context): recorded.append((template, context)) return template_rendered.connected_to(record, app)
上面的例子看起来像这样:
templates = [] with captured_templates(app, templates, **extra): ... template, context = templates[0]
Blinker API Changes
:meth:`~blinker.base.Signal.connected_to` 方法出现于版本1.1中。
如果你要在自己的应用中使用信号,你可以直接使用 blinker 库。最常见的使用情况是命名一个自定义的 :class:`~blinker.base.Namespace` 的信号。这也是大多数时候推荐的做法:
from blinker import Namespace my_signals = Namespace()
现在你可以像这样创建新的信号:
model_saved = my_signals.signal('model-saved')
这里使用唯一的信号名并且简化调试。可以用 :attr:`~blinker.base.NamedSignal.name` 属性来访问信号名。
对扩展开发者
如果你正在编写一个 Flask 扩展,你想优雅地减少缺少 blinker 安装的影响,你可以这样做使用 :class:`flask.signals.Namespace` 类。
如果你想要发送信号,你可以通过调用 :meth:`~blinker.base.Signal.send` 方法来达到目的。它接受一个发件者作为第一个参数和一些可选的被转发到信号用户的关键字参数:
class Model(object): ... def save(self): model_saved.send(self)
永远尝试选择一个合适的发送者。如果你有一个发出信号的类,把 self 作为发送者。如果你从一个随机的函数发出信号,把 current_app._get_current_object()
作为发送者。
传入代理作为发送者
绝不要向信号传入 :data:`~flask.current_app` 作为发送者。相反使用 current_app._get_current_object()
。
这样做的理由是 :data:`~flask.current_app` 是一个代理不是真正的应用对象。
信号在接受时,完全支持 :ref:`request-context` 。 上下文本地的变量在 :data:`~flask.request_started` 和 :data:`~flask.request_finished` 一贯可用, 所以你可以信任 :class:`flask.g` 和其它需要的东西。 注意 :ref:`signals-sending` 和 :data:`~flask.request_tearing_down` 信号中描述的限制。
在 Blinker 1.1 中通过使用新的 :meth:`~blinker.base.NamedSignal.connect_via` 装饰器你也能够轻易地订阅信号:
from flask import template_rendered @template_rendered.connect_via(app) def when_template_rendered(sender, template, context, **extra): print 'Template %s is rendered with %s' % (template.name, context)
Flask 中存在如下信号:
.. data:: flask.template_rendered :noindex: 当一个模版成功地渲染,这个信号会被发送。这个信号与模板实例 `template` 和上下文的词典(名为 `context` )一起调用。 订阅示例:: def log_template_renders(sender, template, context, **extra): sender.logger.debug('Rendering template "%s" with context %s', template.name or 'string template', context) from flask import template_rendered template_rendered.connect(log_template_renders, app)
.. data:: flask.request_started :noindex: 这个信号在建立请求上下文之外的任何请求处理开始前发送。因为请求上下文已经被绑定, 订阅者可以用 :class:`~flask.request` 之类的标准全局代理访问请求。 订阅示例:: def log_request(sender, **extra): sender.logger.debug('Request context is set up') from flask import request_started request_started.connect(log_request, app)
.. data:: flask.request_finished :noindex: 这个信号恰好在请求发送给客户端之前发送。它传递名为 `response` 的响应。 订阅示例:: def log_response(sender, response, **extra): sender.logger.debug('Request context is about to close down. ' 'Response: %s', response) from flask import request_finished request_finished.connect(log_response, app)
.. data:: flask.got_request_exception :noindex: 这个信号在请求处理中抛出异常时发送。它在标准异常处理生效 *之前* ,甚至是在没有异常处理的情况下发送。 异常本身会通过 `exception` 传递到订阅函数。 订阅示例:: def log_exception(sender, exception, **extra): sender.logger.debug('Got exception during processing: %s', exception) from flask import got_request_exception got_request_exception.connect(log_exception, app)
.. data:: flask.request_tearing_down :noindex: 这个信号在请求销毁时发送。它总是被调用,即使发生异常。当前监听这个信号的函数会在常规销毁处理后被调用,但这不是你可以信赖的。 订阅示例:: def close_db_connection(sender, **extra): session.close() from flask import request_tearing_down request_tearing_down.connect(close_db_connection, app) 从 Flask 0.9 ,如果有异常它会被传递一个 `exc` 关键字参数引用导致销毁的异常。
.. data:: flask.appcontext_tearing_down :noindex: 这个信号在应用上下文销毁时发送。它总是被调用,即使发生异常。 当前监听这个信号的函数会在常规销毁处理后被调用,但这不是你可以信赖的。 订阅示例:: def close_db_connection(sender, **extra): session.close() from flask import appcontext_tearing_down appcontext_tearing_down.connect(close_db_connection, app) 如果有异常它会被传递一个 `exc` 关键字参数引用导致销毁的异常。
.. data:: flask.appcontext_pushed :noindex: 当应用上下文被推入的时候,发送信号。应用是发送者。这个信号通常对才测试有用,能够临时地勾住( hook )信息。 比如它能够被用于早期设置一个资源到 `g` 对象。 用法示例:: from contextlib import contextmanager from flask import appcontext_pushed @contextmanager def user_set(app, user): def handler(sender, **kwargs): g.user = user with appcontext_pushed.connected_to(handler, app): yield 在测试代码中:: def test_user_me(self): with user_set(app, 'john'): c = app.test_client() resp = c.get('/users/me') assert resp.data == 'username=john' .. versionadded:: 0.10
.. data:: appcontext_popped 当应用的上下文被弹出的时候,信号被发送。应用就是发送者。这个信号一般随着 :data:`appcontext_tearing_down` 信号使用。 .. versionadded:: 0.10
.. data:: flask.message_flashed :noindex: 当应用闪现一个消息的时候发送信号。消息是作为 `message` 关键字参数并且类别是作为 `category`。 订阅示例:: recorded = [] def record(sender, message, category, **extra): recorded.append((message, category)) from flask import message_flashed message_flashed.connect(record, app) .. versionadded:: 0.10