中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何使用python中的信號通信blinker

發布時間:2021-10-22 10:08:29 來源:億速云 閱讀:165 作者:iii 欄目:開發技術

這篇文章主要介紹“如何使用python中的信號通信blinker”,在日常操作中,相信很多人在如何使用python中的信號通信blinker問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”如何使用python中的信號通信blinker”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

信號:

信號是一種通知或者說通信的方式,信號分為發送方和接收方。發送方發送一中信號,接收方收到信號的進程會跳入信號處理函數,執行完后再跳回原來的位置繼續執行。常見的linux中的信號,通過鍵盤輸入Ctrl+C,就是發送給系統一個信號,告訴系統退出當前進程。

信號的特點就是發送端通知訂閱者發生了什么。使用信號分為3步,定義信號,監聽信號,發送信號

如何使用python中的信號通信blinker

python中提供了信號概念的通信模塊,就是blinker

官方介紹:

Blinker 是一個基于Python的強大的信號庫,它既支持簡單的點對點通信,也支持點對多點的組播。Flask的信號機制就是基于它建立的。Blinker的內核雖然小巧,但是功能卻非常強大,它支持以下特性:

  • 支持注冊全局命名信號

  • 支持匿名信號

  • 支持自定義命名信號

  • 支持與接收者之間的持久連接與短暫連接

  • 通過弱引用實現與接收者之間的自動斷開連接

  • 支持發送任意大小的數據

  • 支持收集信號接收者的返回值

  • 線程安全

blinker 使用

安裝方法:

pip install blinker

命名信號

from blinker import signal

# 定義一個信號
s = signal('king')


def animal(args):
    print('我是小鉆風,大王回來了,我要去巡山')

# 信號注冊一個接收者
s.connect(animal)

if "__main__" == __name__:
    # 發送信號
    s.send()

如何使用python中的信號通信blinker

匿名信號

blinker也支持匿名信號,就是不需要指定一個具體的信號值。創建的每一個匿名信號都是互相獨立的。

from blinker import Signal

s = Signal()

def animal(sender):
    print('我是小鉆風,大王回來了,我要去巡山')

s.connect(animal)

if "__main__" == __name__:
    s.send()

組播信號

組播信號是比較能體現出信號優點的特征。多個接收者注冊到信號上,發送者只需要發送一次就能傳遞信息到多個接收者。

from blinker import signal

s = signal('king')


def animal_one(args):
    print(f'我是小鉆風,今天的口號是: {args}')

def animal_two(args):
    print(f'我是大鉆風,今天的口號是: {args}')


s.connect(animal_one)
s.connect(animal_two)

if "__main__" == __name__:
    s.send('大王叫我來巡山,抓個和尚做晚餐!')

如何使用python中的信號通信blinker

接收方訂閱主題

接受方支持訂閱指定的主題,只有當指定的主題發送消息時才發送給接收方。這種方法很好的區分了不同的主題。

from blinker import signal

s = signal('king')


def animal(args):
    print(f'我是小鉆風,{args} 是我大哥')

s.connect(animal, sender='大象')

if "__main__" == __name__:
    for i in ['獅子', '大象', '大鵬']:
        s.send(i)

如何使用python中的信號通信blinker

裝飾器用法

除了可以函數注冊之外還有更簡單的信號注冊方法,那就是裝飾器。

from blinker import signal

s = signal('king')

@s.connect
def animal_one(args):
    print(f'我是小鉆風,今天的口號是: {args}')

@s.connect
def animal_two(args):
    print(f'我是大鉆風,今天的口號是: {args}')

if "__main__" == __name__:
    s.send('大王叫我來巡山,抓個和尚做晚餐!')

可訂閱主題的裝飾器

connect的注冊方法用著裝飾器時有一個弊端就是不能夠訂閱主題,所以有更高級的connect_via方法支持訂閱主題。

from blinker import signal

s = signal('king')

@s.connect_via('大象')
def animal(args):
    print(f'我是小鉆風,{args} 是我大哥')


if "__main__" == __name__:
    for i in ['獅子', '大象', '大鵬']:
        s.send(i)

檢查信號是否有接收者

如果對于一個發送者發送消息前要準備的耗時很長,為了避免沒有接收者導致浪費性能的情況,所以可以先檢查某一個信號是否有接收者,在確定有接收者的情況下才發送,做到精確。

from blinker import signal

s = signal('king')
q = signal('queue')


def animal(sender):
    print('我是小鉆風,大王回來了,我要去巡山')

s.connect(animal)


if "__main__" == __name__:
    
    res = s.receivers
    print(res)
    if res:
        s.send()
    
    res = q.receivers
    print(res)
    if res:
        q.send()
    else:
        print("孩兒們都出去巡山了")
{4511880240: <weakref at 0x10d02ae80; to 'function' at 0x10cedd430 (animal)>}
我是小鉆風,大王回來了,我要去巡山
{}
孩兒們都出去巡山了

檢查訂閱者是否訂閱了某個信號

也可以檢查訂閱者是否由某一個信號

from blinker import signal

s = signal('king')
q = signal('queue')


def animal(sender):
    print('我是小鉆風,大王回來了,我要去巡山')

s.connect(animal)


if "__main__" == __name__:
    
    res = s.has_receivers_for(animal)
    print(res)

    res = q.has_receivers_for(animal)
    print(res)
True
False

基于blinker的Flask信號

Flask集成blinker作為解耦應用的解決方案。在Flask中,信號的使用場景如:請求到來之前,請求結束之后。同時Flask也支持自定義信號。

簡單 Flask demo

from flask import Flask

app = Flask(__name__)

@app.route('/',methods=['GET','POST'],endpoint='index')
def index():
    return 'hello blinker'

if __name__ == '__main__':
    app.run()

訪問127.0.0.1:5000時,返回給瀏覽器hello blinker

如何使用python中的信號通信blinker

自定義信號

因為flask集成了信號,所以在flask中使用信號時從flask中引入。

from flask.signals import _signals
from flask import Flask
from flask.signals import _signals

app = Flask(__name__)

s = _signals.singal('msg')


def QQ(args):
    print('you have msg from QQ')

s.connect(QQ)

@app.route('/',methods=['GET','POST'],endpoint='index')
def index():
    s.send()
    return 'hello blinker'

if __name__ == '__main__':
    app.run()

如何使用python中的信號通信blinker

Flask自帶信號

在Flask中除了可以自定義信號,還可以使用自帶信號。Flask中自帶的信號有很多種,具體如下:

請求
request_started = _signals.signal('request-started')                # 請求到來前執行
request_finished = _signals.signal('request-finished')              # 請求結束后執行
 
模板渲染
before_render_template = _signals.signal('before-render-template')  # 模板渲染前執行
template_rendered = _signals.signal('template-rendered')            # 模板渲染后執行
 
請求執行
got_request_exception = _signals.signal('got-request-exception')    # 請求執行出現異常時執行
request_tearing_down = _signals.signal('request-tearing-down')      # 請求執行完畢后自動執行(無論成功與否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down') # 請求上下文執行完畢后自動執行(無論成功與否)
 
請求上下文中
appcontext_pushed = _signals.signal('appcontext-pushed')            # 請求上下文push時執行
appcontext_popped = _signals.signal('appcontext-popped')            # 請求上下文pop時執行

message_flashed = _signals.signal('message-flashed')                # 調用flask在其中添加數據時,自動觸發

下面以請求到來之前為例,看flask中信號如何使用

from flask import Flask
from flask.signals import _signals, request_started
import time

app = Flask(__name__)

def wechat(args):
    print('you have msg from wechat')

# 從flask中引入已經定好的信號,注冊一個函數
request_started.connect(wechat)

@app.route('/',methods=['GET','POST'],endpoint='index')
def index():
    return 'hello blinker'

if __name__ == '__main__':
    app.run()

當請求到來時,flask會經過request_started 通知接受方,就是函數wechat,這時wechat函數先執行,然后才返回結果給瀏覽器。

如何使用python中的信號通信blinker

但這種使用方法并不是很地道,因為信號并不支持異步方法,所以通常在生產環境中信號的接收者都是配置異步執行的框架,如python中大名鼎鼎的異步框架celery。

總結

信號的優點:

  • 解耦應用:將串行運行的耦合應用分解為多級執行

  • 發布訂閱者:減少調用者的使用,一次調用通知多個訂閱者

信號的缺點:

  • 不支持異步

  • 支持訂閱主題的能力有限

到此,關于“如何使用python中的信號通信blinker”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

横山县| 剑川县| 大名县| 岳阳县| 沧州市| 南江县| 张掖市| 南木林县| 龙井市| 拉孜县| 牙克石市| 民县| 商水县| 广水市| 垦利县| 大余县| 铜鼓县| 广丰县| 利川市| 朝阳市| 神木县| 浦城县| 合阳县| 临颍县| 巴里| 临泽县| 大渡口区| 正阳县| 潞城市| 东平县| 西贡区| 五常市| 清丰县| 雷波县| 台中县| 分宜县| 苍溪县| 平安县| 札达县| 晋州市| 新晃|