中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python強大的信號庫blinker怎么使用

發布時間:2023-05-04 09:38:48 來源:億速云 閱讀:119 作者:iii 欄目:編程語言

本文小編為大家詳細介紹“Python強大的信號庫blinker怎么使用”,內容詳細,步驟清晰,細節處理妥當,希望這篇“Python強大的信號庫blinker怎么使用”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學習新知識吧。

一. 信號

信號是一種通知或者說通信的方式,信號分為發送方和接收方。發送方發送一種信號,接收方收到信號的進程會跳入信號處理函數,執行完后再跳回原來的位置繼續執行。

常見的 Linux 中的信號,通過鍵盤輸入 Ctrl+C,就是發送給系統一個信號,告訴系統退出當前進程。

信號的特點就是發送端通知訂閱者發生了什么。使用信號分為 3 步:定義信號,監聽信號,發送信號。

Python強大的信號庫blinker怎么使用

Python 中提供了信號概念的通信模塊,就是blinker。

Blinker 是一個基于 Python 的強大的信號庫,它既支持簡單的點對點通信,也支持點對多點的組播。Flask 的信號機制就是基于它建立的。Blinker 的內核雖然小巧,但是功能卻非常強大,它支持以下特性:

  • 支持注冊全局命名信號

  • 支持匿名信號

  • 支持自定義命名信號

  • 支持與接收者之間的持久連接與短暫連接

  • 通過弱引用實現與接收者之間的自動斷開連接

  • 支持發送任意大小的數據

  • 支持收集信號接收者的返回值

  • 線程安全

二. blinker 使用

安裝方法:

pip install blinker
2.1 命名信號
from blinker import signal
# 定義一個信號
s = signal('king')
def animal(args):
print('我是小鉆風,大王回來了,我要去巡山')
# 信號注冊一個接收者
s.connect(animal)
if "__main__" == __name__:
# 發送信號
s.send()

Python強大的信號庫blinker怎么使用

2.2 匿名信號

blinker 也支持匿名信號,就是不需要指定一個具體的信號值。創建的每一個匿名信號都是互相獨立的。

from blinker import Signal
s = Signal()
def animal(sender):
print('我是小鉆風,大王回來了,我要去巡山')
s.connect(animal)
if "__main__" == __name__:
s.send()
2.3 組播信號

組播信號是比較能體現出信號優點的特征。多個接收者注冊到信號上,發送者只需要發送一次就能傳遞信息到多個接收者。

from blinker import signal
s = signal('king')
def animal_one(args):
print(f'我是小鉆風,今天的口號是: {args}')
def animal_two(args):
print(f'我是大鉆風,今天的口號是: {args}')
s.connect(animal_one)
s.connect(animal_two)
if "__main__" == __name__:
s.send('大王叫我來巡山,抓個和尚做晚餐!')

Python強大的信號庫blinker怎么使用

2.4 接收方訂閱主題

接受方支持訂閱指定的主題,只有當指定的主題發送消息時才發送給接收方。這種方法很好的區分了不同的主題。

from blinker import signal
s = signal('king')
def animal(args):
print(f'我是小鉆風,{args} 是我大哥')
s.connect(animal, sender='大象')
if "__main__" == __name__:
for i in ['獅子', '大象', '大鵬']:
s.send(i)

Python強大的信號庫blinker怎么使用

2.5 裝飾器用法

除了可以函數注冊之外還有更簡單的信號注冊方法,那就是裝飾器。

from blinker import signal
s = signal('king')
@s.connect
def animal_one(args):
print(f'我是小鉆風,今天的口號是: {args}')
@s.connect
def animal_two(args):
print(f'我是大鉆風,今天的口號是: {args}')
if "__main__" == __name__:
s.send('大王叫我來巡山,抓個和尚做晚餐!')
2.6 可訂閱主題的裝飾器

connect的注冊方法用著裝飾器時有一個弊端就是不能夠訂閱主題,所以有更高級的connect_via方法支持訂閱主題。

from blinker import signal
s = signal('king')
@s.connect_via('大象')
def animal(args):
print(f'我是小鉆風,{args} 是我大哥')
if "__main__" == __name__:
for i in ['獅子', '大象', '大鵬']:
s.send(i)
2.7 檢查信號是否有接收者

如果對于一個發送者發送消息前要準備的耗時很長,為了避免沒有接收者導致浪費性能的情況,所以可以先檢查某一個信號是否有接收者,在確定有接收者的情況下才發送,做到精確。

from blinker import signal
s = signal('king')
q = signal('queue')
def animal(sender):
print('我是小鉆風,大王回來了,我要去巡山')
s.connect(animal)
if "__main__" == __name__:
res = s.receivers
print(res)
if res:
s.send()
res = q.receivers
print(res)
if res:
q.send()
else:
print("孩兒們都出去巡山了")
{4511880240:}
我是小鉆風,大王回來了,我要去巡山
{}
孩兒們都出去巡山了
2.8 檢查訂閱者是否訂閱了某個信號

也可以檢查訂閱者是否由某一個信號

from blinker import signal
s = signal('king')
q = signal('queue')
def animal(sender):
print('我是小鉆風,大王回來了,我要去巡山')
s.connect(animal)
if "__main__" == __name__:
res = s.has_receivers_for(animal)
print(res)
res = q.has_receivers_for(animal)
print(res)
True
False

三.基于 blinker 的 Flask 信號

Flask 集成 blinker 作為解耦應用的解決方案。在 Flask 中,信號的使用場景如:請求到來之前,請求結束之后。同時 Flask 也支持自定義信號。

3.1 簡單 Flask demo
from flask import Flask
app = Flask(__name__)
@app.route('/',methods=['GET','POST'],endpoint='index')
def index():
return 'hello blinker'
if __name__ == '__main__':
app.run()

訪問127.0.0.1:5000時,返回給瀏覽器hello blinker。

Python強大的信號庫blinker怎么使用
3.2 自定義信號

因為 Flask 集成了信號,所以在 Flask 中使用信號時從 Flask 中引入。

from flask import Flask
from flask.signals import _signals
app = Flask(__name__)
s = _signals.singal('msg')
def QQ(args):
print('you have msg from QQ')
s.connect(QQ)
@app.route('/',methods=['GET','POST'],endpoint='index')
def index():
s.send()
return 'hello blinker'
if __name__ == '__main__':
app.run()

Python強大的信號庫blinker怎么使用

3.3 Flask自帶信號

在 Flask 中除了可以自定義信號,還可以使用自帶信號。Flask 中自帶的信號有很多種,具體如下:

請求
request_started = _signals.signal('request-started')# 請求到來前執行
request_finished = _signals.signal('request-finished')# 請求結束后執行
模板渲染
before_render_template = _signals.signal('before-render-template')# 模板渲染前執行
template_rendered = _signals.signal('template-rendered')# 模板渲染后執行
請求執行
got_request_exception = _signals.signal('got-request-exception')# 請求執行出現異常時執行
request_tearing_down = _signals.signal('request-tearing-down')# 請求執行完畢后自動執行(無論成功與否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down') # 請求上下文執行完畢后自動執行(無論成功與否)
請求上下文中
appcontext_pushed = _signals.signal('appcontext-pushed')# 請求上下文push時執行
appcontext_popped = _signals.signal('appcontext-popped')# 請求上下文pop時執行
message_flashed = _signals.signal('message-flashed')# 調用flask在其中添加數據時,自動觸發

下面以請求到來之前為例,看 Flask 中信號如何使用

from flask import Flask
from flask.signals import _signals, request_started
import time
app = Flask(__name__)
def wechat(args):
print('you have msg from wechat')
# 從flask中引入已經定好的信號,注冊一個函數
request_started.connect(wechat)
@app.route('/',methods=['GET','POST'],endpoint='index')
def index():
return 'hello blinker'
if __name__ == '__main__':
app.run()

當請求到來時,Flask 會經過request_started 通知接受方,就是函數wechat,這時wechat函數先執行,然后才返回結果給瀏覽器。

Python強大的信號庫blinker怎么使用

但這種使用方法并不是很地道,因為信號并不支持異步方法,所以通常在生產環境中信號的接收者都是配置異步執行的框架,如 Python 中大名鼎鼎的異步框架 celery。

讀到這里,這篇“Python強大的信號庫blinker怎么使用”文章已經介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領會,如果想了解更多相關內容的文章,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

白山市| 波密县| 临夏市| 冀州市| 闸北区| 穆棱市| 陵水| 高要市| 奉贤区| 金湖县| 观塘区| 永平县| 台安县| 大英县| 调兵山市| 阳城县| 阆中市| 吉木萨尔县| 平和县| 礼泉县| 介休市| 赤峰市| 胶州市| 宜章县| 苍梧县| 云安县| 开远市| 拉萨市| 富顺县| 腾冲县| 江油市| 荥阳市| 巴林左旗| 白沙| 柘城县| 宿迁市| 探索| 邹平县| 北川| 柳河县| 浦城县|