您好,登錄后才能下訂單哦!
如何在Python中使用協程?相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
1、云計算,典型應用OpenStack。2、WEB前端開發,眾多大型網站均為Python開發。3.人工智能應用,基于大數據分析和深度學習而發展出來的人工智能本質上已經無法離開python。4、系統運維工程項目,自動化運維的標配就是python+Django/flask。5、金融理財分析,量化交易,金融分析。6、大數據分析。
協程的生成器的基本行為
這里有一個最簡單的協程代碼:
def simple_coroutine(): print('-> start') x = yield print('-> recived', x) sc = simple_coroutine() next(sc) sc.send('zhexiao')
解釋:
1. 協程使用生成器函數定義:定義體中有 yield 關鍵字。
2. yield 在表達式中使用;如果協程只需從客戶那里接收數據,那么產出的值是 None —— 這個值是隱式指定的,因為 yield 關鍵字右邊沒有表達式。
3. 首先要調用 next(…) 函數,因為生成器還沒啟動,沒在 yield 語句處暫停,所以一開始無法發送數據。
4. 調用send方法,把值傳給 yield 的變量,然后協程恢復,繼續執行下面的代碼,直到運行到下一個 yield 表達式,或者終止。
==注意:send方法只有當協程處于 GEN_SUSPENDED 狀態下時才會運作,所以我們使用 next() 方法激活協程到 yield 表達式處停止,或者我們也可以使用 sc.send(None),效果與 next(sc) 一樣==。
協程的四個狀態:
協程可以身處四個狀態中的一個。當前狀態可以使用inspect.getgeneratorstate(…) 函數確定,該函數會返回下述字符串中的一個:
1. GEN_CREATED:等待開始執行
2. GEN_RUNNING:解釋器正在執行
3. GEN_SUSPENED:在yield表達式處暫停
4. GEN_CLOSED:執行結束
==最先調用 next(sc) 函數這一步通常稱為“預激”(prime)協程==(即,讓協程向前執行到第一個 yield 表達式,準備好作為活躍的協程使用)。
import inspect def simple_coroutine(a): print('-> start') b = yield a print('-> recived', a, b) c = yield a + b print('-> recived', a, b, c) # run sc = simple_coroutine(5) next(sc) sc.send(6) # 5, 6 sc.send(7) # 5, 6, 7
示例:使用協程計算移動平均值
def averager(): total = 0.0 count = 0 avg = None while True: num = yield avg total += num count += 1 avg = total/count # run ag = averager() # 預激協程 print(next(ag)) # None print(ag.send(10)) # 10 print(ag.send(20)) # 15
解釋:
1. 調用 next(ag) 函數后,協程會向前執行到 yield 表達式,產出 average 變量的初始值——None。
2. 此時,協程在 yield 表達式處暫停。
3. 使用 send() 激活協程,把發送的值賦給 num,并計算出 avg 的值。
4. 使用 print 打印出 yield 返回的數據。
終止協程和異常處理
協程中未處理的異常會向上冒泡,傳給 next 函數或 send 方法的調用方(即觸發協程的對象)。
==終止協程的一種方式:發送某個哨符值,讓協程退出。內置的 None 和Ellipsis 等常量經常用作哨符值==。
顯式地把異常發給協程
從 Python 2.5 開始,客戶代碼可以在生成器對象上調用兩個方法,顯式地把異常發給協程。
generator.throw(exc_type[, exc_value[, traceback]])
致使生成器在暫停的 yield 表達式處拋出指定的異常。如果生成器處理了拋出的異常,代碼會向前執行到下一個 yield 表達式,而產出的值會成為調用 generator.throw方法得到的返回值。如果生成器沒有處理拋出的異常,異常會向上冒泡,傳到調用方的上下文中。
generator.close()
致使生成器在暫停的 yield 表達式處拋出 GeneratorExit 異常。如果生成器沒有處理這個異常,或者拋出了 StopIteration 異常(通常是指運行到結尾),調用方不會報錯。如果收到 GeneratorExit 異常,生成器一定不能產出值,否則解釋器會拋出RuntimeError 異常。生成器拋出的其他異常會向上冒泡,傳給調用方。
異常處理示例:
class DemoException(Exception): """ custom exception """ def handle_exception(): print('-> start') while True: try: x = yield except DemoException: print('-> run demo exception') else: print('-> recived x:', x) raise RuntimeError('this line should never run') he = handle_exception() next(he) he.send(10) # recived x: 10 he.send(20) # recived x: 20 he.throw(DemoException) # run demo exception he.send(40) # recived x: 40 he.close()
如果傳入無法處理的異常,則協程會終止:
he.throw(Exception) # run demo exception
yield from獲取協程的返回值
為了得到返回值,協程必須正常終止;然后生成器對象會拋出StopIteration 異常,異常對象的 value 屬性保存著返回的值。
==yield from 結構會在內部自動捕獲 StopIteration 異常==。對 yield from 結構來說,解釋器不僅會捕獲 StopIteration 異常,還會把value 屬性的值變成 yield from 表達式的值。
yield from基本用法
==在生成器 gen 中使用 yield from subgen() 時, subgen 會獲得控制權,把產出的值傳給 gen 的調用方,即調用方可以直接控制 subgen。與此同時, gen 會阻塞,等待 subgen 終止==。
下面2個函數的作用一樣,只是使用了 yield from 的更加簡潔:
def gen(): for c in 'AB': yield c print(list(gen())) def gen_new(): yield from 'AB' print(list(gen_new()))
==yield from x 表達式對 x 對象所做的第一件事是,調用 iter(x),從中獲取迭代器,因此, x 可以是任何可迭代的對象,這只是 yield from 最基礎的用法==。
yield from高級用法
==yield from 的主要功能是打開雙向通道,把最外層的調用方與最內層的子生成器連接起來,這樣二者可以直接發送和產出值,還可以直接傳入異常,而不用在位于中間的協程中添加大量處理異常的樣板代碼==。
yield from 專門的術語
委派生成器:包含 yield from 表達式的生成器函數。
子生成器:從 yield from 中 部分獲取的生成器。
圖示
解釋:
1. 委派生成器在 yield from 表達式處暫停時,調用方可以直接把數據發給子生成器。
2. 子生成器再把產出的值發給調用方。
3. 子生成器返回之后,解釋器會拋出 StopIteration 異常,并把返回值附加到異常對象上,此時委派生成器會恢復。
高級示例
from collections import namedtuple ResClass = namedtuple('Res', 'count average') # 子生成器 def averager(): total = 0.0 count = 0 average = None while True: term = yield if term is None: break total += term count += 1 average = total / count return ResClass(count, average) # 委派生成器 def grouper(storages, key): while True: # 獲取averager()返回的值 storages[key] = yield from averager() # 客戶端代碼 def client(): process_data = { 'boys_2': [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3], 'boys_1': [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46] } storages = {} for k, v in process_data.items(): # 獲得協程 coroutine = grouper(storages, k) # 預激協程 next(coroutine) # 發送數據到協程 for dt in v: coroutine.send(dt) # 終止協程 coroutine.send(None) print(storages) # run client()
解釋:
1. 外層 for 循環每次迭代會新建一個 grouper 實例,賦值給 coroutine 變量; grouper 是委派生成器。
2. 調用 next(coroutine),預激委派生成器 grouper,此時進入 while True 循環,調用子生成器 averager 后,在 yield from 表達式處暫停。
3. 內層 for 循環調用 coroutine.send(value),直接把值傳給子生成器 averager。同時,當前的 grouper 實例(coroutine)在 yield from 表達式處暫停。
4. 內層循環結束后, grouper 實例依舊在 yield from 表達式處暫停,因此, grouper函數定義體中為 results[key] 賦值的語句還沒有執行。
5. coroutine.send(None) 終止 averager 子生成器,子生成器拋出 StopIteration 異常并將返回的數據包含在異常對象的value中,yield from 可以直接抓取 StopItration 異常并將異常對象的 value 賦值給 results[key]
yield from的意義
子生成器產出的值都直接傳給委派生成器的調用方(即客戶端代碼)。
使用 send() 方法發給委派生成器的值都直接傳給子生成器。如果發送的值是None,那么會調用子生成器的 next() 方法。如果發送的值不是 None,那么會調用子生成器的 send() 方法。如果調用的方法拋出 StopIteration 異常,那么委派生成器恢復運行。任何其他異常都會向上冒泡,傳給委派生成器。
生成器退出時,生成器(或子生成器)中的 return expr 表達式會觸發 StopIteration(expr) 異常拋出。
yield from 表達式的值是子生成器終止時傳給 StopIteration 異常的第一個參數。
傳入委派生成器的異常,除了 GeneratorExit 之外都傳給子生成器的 throw() 方法。如果調用 throw() 方法時拋出 StopIteration 異常,委派生成器恢復運行。 StopIteration 之外的異常會向上冒泡,傳給委派生成器。
如果把 GeneratorExit 異常傳入委派生成器,或者在委派生成器上調用 close() 方法,那么在子生成器上調用 close() 方法,如果它有的話。如果調用close()方法導致異常拋出,那么異常會向上冒泡,傳給委派生成器;否則,委派生成器拋出GeneratorExit 異常。
使用案例
協程能自然地表述很多算法,例如仿真、游戲、異步 I/O,以及其他事件驅動型編程形式或協作式多任務。協程是 asyncio 包的基礎構建。通過仿真系統能說明如何使用協程代替線程實現并發的活動。
在仿真領域,進程這個術語指代模型中某個實體的活動,與操作系統中的進程無關。仿真系統中的一個進程可以使用操作系統中的一個進程實現,但是通常會使用一個線程或一個協程實現。
出租車示例
import collections # time 字段是事件發生時的仿真時間, # proc 字段是出租車進程實例的編號, # action 字段是描述活動的字符串。 Event = collections.namedtuple('Event', 'time proc action') def taxi_process(proc_num, trips_num, start_time=0): """ 每次改變狀態時創建事件,把控制權讓給仿真器 :param proc_num: :param trips_num: :param start_time: :return: """ time = yield Event(start_time, proc_num, 'leave garage') for i in range(trips_num): time = yield Event(time, proc_num, 'pick up people') time = yield Event(time, proc_num, 'drop off people') yield Event(time, proc_num, 'go home') # run t1 = taxi_process(1, 1) a = next(t1) print(a) # Event(time=0, proc=1, action='leave garage') b = t1.send(a.time + 6) print(b) # Event(time=6, proc=1, action='pick up people') c = t1.send(b.time + 12) print(c) # Event(time=18, proc=1, action='drop off people') d = t1.send(c.time + 1) print(d) # Event(time=19, proc=1, action='go home')
模擬控制臺控制3個出租車異步
import collections import queue import random # time 字段是事件發生時的仿真時間, # proc 字段是出租車進程實例的編號, # action 字段是描述活動的字符串。 Event = collections.namedtuple('Event', 'time proc action') def taxi_process(proc_num, trips_num, start_time=0): """ 每次改變狀態時創建事件,把控制權讓給仿真器 :param proc_num: :param trips_num: :param start_time: :return: """ time = yield Event(start_time, proc_num, 'leave garage') for i in range(trips_num): time = yield Event(time, proc_num, 'pick up people') time = yield Event(time, proc_num, 'drop off people') yield Event(time, proc_num, 'go home') class SimulateTaxi(object): """ 模擬出租車控制臺 """ def __init__(self, proc_map): # 保存排定事件的 PriorityQueue 對象, # 如果進來的是tuple類型,則默認使用tuple[0]做排序 self.events = queue.PriorityQueue() # procs_map 參數是一個字典,使用dict構建本地副本 self.procs = dict(proc_map) def run(self, end_time): """ 排定并顯示事件,直到時間結束 :param end_time: :return: """ for _, taxi_gen in self.procs.items(): leave_evt = next(taxi_gen) self.events.put(leave_evt) # 仿真系統的主循環 simulate_time = 0 while simulate_time < end_time: if self.events.empty(): print('*** end of events ***') break # 第一個事件的發生 current_evt = self.events.get() simulate_time, proc_num, action = current_evt print('taxi:', proc_num, ', at time:', simulate_time, ', ', action) # 準備下個事件的發生 proc_gen = self.procs[proc_num] next_simulate_time = simulate_time + self.compute_duration() try: next_evt = proc_gen.send(next_simulate_time) except StopIteration: del self.procs[proc_num] else: self.events.put(next_evt) else: msg = '*** end of simulation time: {} events pending ***' print(msg.format(self.events.qsize())) @staticmethod def compute_duration(): """ 隨機產生下個事件發生的時間 :return: """ duration_time = random.randint(1, 20) return duration_time # 生成3個出租車,現在全部都沒有離開garage taxis = {i: taxi_process(i, (i + 1) * 2, i * 5) for i in range(3)} # 模擬運行 st = SimulateTaxi(taxis) st.run(100)
看完上述內容,你們掌握如何在Python中使用協程的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。