中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何在python中使用asyncio模塊

發布時間:2021-03-17 16:26:52 來源:億速云 閱讀:252 作者:Leah 欄目:開發技術

今天就跟大家聊聊有關如何在python中使用asyncio模塊,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

import time
import asyncio


now = lambda : time.time()


async def do_some_work(x):
  print("waiting:", x)

start = now()
# 這里是一個協程對象,這個時候do_some_work函數并沒有執行
coroutine = do_some_work(2)
print(coroutine)
# 創建一個事件loop
loop = asyncio.get_event_loop()
# 將協程加入到事件循環loop
loop.run_until_complete(coroutine)

print("Time:",now()-start)

在上面帶中我們通過async關鍵字定義一個協程(coroutine),當然協程不能直接運行,需要將協程加入到事件循環loop中

asyncio.get_event_loop:創建一個事件循環,然后使用run_until_complete將協程注冊到事件循環,并啟動事件循環

創建一個task

協程對象不能直接運行,在注冊事件循環的時候,其實是run_until_complete方法將協程包裝成為了一個任務(task)對象. task對象是Future類的子類,保存了協程運行后的狀態,用于未來獲取協程的結果

import asyncio
import time


now = lambda: time.time()


async def do_some_work(x):
  print("waiting:", x)

start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print(task)
loop.run_until_complete(task)
print(task)
print("Time:",now()-start)

結果為:

<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex2.py:13>>
waiting: 2
<Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex2.py:13> result=None>
Time: 0.0003514289855957031

創建task后,在task加入事件循環之前為pending狀態,當完成后,狀態為finished

關于上面通過loop.create_task(coroutine)創建task,同樣的可以通過 asyncio.ensure_future(coroutine)創建task

關于這兩個命令的官網解釋: https://docs.python.org/3/library/asyncio-task.html

asyncio.ensure_future(coro_or_future, *, loop=None)&para;
Schedule the execution of a coroutine object: wrap it in a future. Return a Task object.

If the argument is a Future, it is returned directly.

https://docs.python.org/3/library/asyncio-eventloop.html

AbstractEventLoop.create_task(coro)
Schedule the execution of a coroutine object: wrap it in a future. Return a Task object.

Third-party event loops can use their own subclass of Task for interoperability. In this case, the result type is a subclass of Task.

This method was added in Python 3.4.2. Use the async() function to support also older Python versions.

綁定回調

綁定回調,在task執行完成的時候可以獲取執行的結果,回調的最后一個參數是future對象,通過該對象可以獲取協程返回值。

import time
import asyncio


now = lambda : time.time()


async def do_some_work(x):
  print("waiting:",x)
  return "Done after {}s".format(x)


def callback(future):
  print("callback:",future.result())


start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
print(task)
task.add_done_callback(callback)
print(task)
loop.run_until_complete(task)

print("Time:", now()-start)

結果為:

<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex3.py:13>>
<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex3.py:13> cb=[callback() at /app/py_code/study_asyncio/simple_ex3.py:18]>
waiting: 2
callback: Done after 2s
Time: 0.00039196014404296875

通過add_done_callback方法給task任務添加回調函數,當task(也可以說是coroutine)執行完成的時候,就會調用回調函數。并通過參數future獲取協程執行的結果。這里我們創建 的task和回調里的future對象實際上是同一個對象

阻塞和await

使用async可以定義協程對象,使用await可以針對耗時的操作進行掛起,就像生成器里的yield一樣,函數讓出控制權。協程遇到await,事件循環將會掛起該協程,執行別的協程,直到其他的協程也掛起或者執行完畢,再進行下一個協程的執行

耗時的操作一般是一些IO操作,例如網絡請求,文件讀取等。我們使用asyncio.sleep函數來模擬IO操作。協程的目的也是讓這些IO操作異步化。

import asyncio
import time



now = lambda :time.time()

async def do_some_work(x):
  print("waiting:",x)
  # await 后面就是調用耗時的操作
  await asyncio.sleep(x)
  return "Done after {}s".format(x)


start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
loop.run_until_complete(task)

print("Task ret:", task.result())
print("Time:", now() - start)

在await asyncio.sleep(x),因為這里sleep了,模擬了阻塞或者耗時操作,這個時候就會讓出控制權。 即當遇到阻塞調用的函數的時候,使用await方法將協程的控制權讓出,以便loop調用其他的協程。

并發和并行

并發指的是同時具有多個活動的系統

并行值得是用并發來使一個系統運行的更快。并行可以在操作系統的多個抽象層次進行運用

所以并發通常是指有多個任務需要同時進行,并行則是同一個時刻有多個任務執行

下面這個例子非常形象:

并發情況下是一個老師在同一時間段輔助不同的人功課。并行則是好幾個老師分別同時輔助多個學生功課。簡而言之就是一個人同時吃三個饅頭還是三個人同時分別吃一個的情況,吃一個饅頭算一個任務

import asyncio
import time


now = lambda :time.time()


async def do_some_work(x):
  print("Waiting:",x)
  await asyncio.sleep(x)
  return "Done after {}s".format(x)

start = now()

coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
  asyncio.ensure_future(coroutine1),
  asyncio.ensure_future(coroutine2),
  asyncio.ensure_future(coroutine3)
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
  print("Task ret:",task.result())

print("Time:",now()-start)

運行結果:

Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
Time: 4.004154920578003

總時間為4s左右。4s的阻塞時間,足夠前面兩個協程執行完畢。如果是同步順序的任務,那么至少需要7s。此時我們使用了aysncio實現了并發。asyncio.wait(tasks) 也可以使用 asyncio.gather(*tasks) ,前者接受一個task列表,后者接收一堆task。

關于asyncio.gather和asyncio.wait官網的說明:

https://docs.python.org/3/library/asyncio-task.html

Return a future aggregating results from the given coroutine objects or futures.

All futures must share the same event loop. If all the tasks are done successfully, the returned future's result is the list of results (in the order of the original sequence, not necessarily the order of results arrival). If return_exceptions is true, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.

https://docs.python.org/3/library/asyncio-task.html

Wait for the Futures and coroutine objects given by the sequence futures to complete. Coroutines will be wrapped in Tasks. Returns two sets of Future: (done, pending).

The sequence futures must not be empty.

timeout can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

return_when indicates when this function should return.

協程嵌套

使用async可以定義協程,協程用于耗時的io操作,我們也可以封裝更多的io操作過程,這樣就實現了嵌套的協程,即一個協程中await了另外一個協程,如此連接起來。

import asyncio
import time


now = lambda: time.time()

async def do_some_work(x):
  print("waiting:",x)
  await asyncio.sleep(x)
  return "Done after {}s".format(x)

async def main():
  coroutine1 = do_some_work(1)
  coroutine2 = do_some_work(2)
  coroutine3 = do_some_work(4)
  tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
  ]

  dones, pendings = await asyncio.wait(tasks)
  for task in dones:
    print("Task ret:", task.result())

  # results = await asyncio.gather(*tasks)
  # for result in results:
  #   print("Task ret:",result)


start = now()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("Time:", now()-start)

如果我們把上面代碼中的:

  dones, pendings = await asyncio.wait(tasks)
  for task in dones:
    print("Task ret:", task.result())

替換為:

  results = await asyncio.gather(*tasks)
  for result in results:
    print("Task ret:",result)

這樣得到的就是一個結果的列表

不在main協程函數里處理結果,直接返回await的內容,那么最外層的run_until_complete將會返回main協程的結果。 將上述的代碼更改為:

import asyncio
import time


now = lambda: time.time()

async def do_some_work(x):
  print("waiting:",x)
  await asyncio.sleep(x)
  return "Done after {}s".format(x)

async def main():
  coroutine1 = do_some_work(1)
  coroutine2 = do_some_work(2)
  coroutine3 = do_some_work(4)
  tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
  ]
  return await asyncio.gather(*tasks)

start = now()

loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())
for result in results:
  print("Task ret:",result)

print("Time:", now()-start)

或者返回使用asyncio.wait方式掛起協程。

將代碼更改為:

import asyncio
import time


now = lambda: time.time()

async def do_some_work(x):
  print("waiting:",x)
  await asyncio.sleep(x)
  return "Done after {}s".format(x)

async def main():
  coroutine1 = do_some_work(1)
  coroutine2 = do_some_work(2)
  coroutine3 = do_some_work(4)
  tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
  ]
  return await asyncio.wait(tasks)

start = now()

loop = asyncio.get_event_loop()
done,pending = loop.run_until_complete(main())
for task in done:
  print("Task ret:",task.result())

print("Time:", now()-start)

也可以使用asyncio的as_completed方法

import asyncio
import time


now = lambda: time.time()

async def do_some_work(x):
  print("waiting:",x)
  await asyncio.sleep(x)
  return "Done after {}s".format(x)

async def main():
  coroutine1 = do_some_work(1)
  coroutine2 = do_some_work(2)
  coroutine3 = do_some_work(4)
  tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
  ]
  for task in asyncio.as_completed(tasks):
    result = await task
    print("Task ret: {}".format(result))

start = now()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("Time:", now()-start)

從上面也可以看出,協程的調用和組合非常靈活,主要體現在對于結果的處理:如何返回,如何掛起

協程的停止

future對象有幾個狀態:

Pending Running Done Cacelled

創建future的時候,task為pending,事件循環調用執行的時候當然就是running,調用完畢自然就是done,如果需要停止事件循環,就需要先把task取消。可以使用asyncio.Task獲取事件循環的task

import asyncio
import time


now = lambda :time.time()


async def do_some_work(x):
  print("Waiting:",x)
  await asyncio.sleep(x)
  return "Done after {}s".format(x)

coroutine1 =do_some_work(1)
coroutine2 =do_some_work(2)
coroutine3 =do_some_work(2)

tasks = [
  asyncio.ensure_future(coroutine1),
  asyncio.ensure_future(coroutine2),
  asyncio.ensure_future(coroutine3),
]

start = now()

loop = asyncio.get_event_loop()
try:
  loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
  print(asyncio.Task.all_tasks())
  for task in asyncio.Task.all_tasks():
    print(task.cancel())
  loop.stop()
  loop.run_forever()
finally:
  loop.close()

print("Time:",now()-start)

啟動事件循環之后,馬上ctrl+c,會觸發run_until_complete的執行異常 KeyBorardInterrupt。然后通過循環asyncio.Task取消future。可以看到輸出如下:

Waiting: 1
Waiting: 2
Waiting: 2
^C{<Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex10.py:13> result='Done after 1s'>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<wait() running at /usr/local/lib/python3.5/asyncio/tasks.py:361> wait_for=<Future pending cb=[Task._wakeup()]>>}
False
True
True
True
Time: 1.0707225799560547

True表示cannel成功,loop stop之后還需要再次開啟事件循環,最后在close,不然還會拋出異常

循環task,逐個cancel是一種方案,可是正如上面我們把task的列表封裝在main函數中,main函數外進行事件循環的調用。這個時候,main相當于最外出的一個task,那么處理包裝的main函數即可。

不同線程的事件循環

很多時候,我們的事件循環用于注冊協程,而有的協程需要動態的添加到事件循環中。一個簡單的方式就是使用多線程。當前線程創建一個事件循環,然后在新建一個線程,在新線程中啟動事件循環。當前線程不會被block。

import asyncio
from threading import Thread
import time

now = lambda :time.time()

def start_loop(loop):
  asyncio.set_event_loop(loop)
  loop.run_forever()

def more_work(x):
  print('More work {}'.format(x))
  time.sleep(x)
  print('Finished more work {}'.format(x))

start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

new_loop.call_soon_threadsafe(more_work, 6)
new_loop.call_soon_threadsafe(more_work, 3)

啟動上述代碼之后,當前線程不會被block,新線程中會按照順序執行call_soon_threadsafe方法注冊的more_work方法, 后者因為time.sleep操作是同步阻塞的,因此運行完畢more_work需要大致6 + 3

新線程協程

import asyncio
import time
from threading import Thread

now = lambda :time.time()


def start_loop(loop):
  asyncio.set_event_loop(loop)
  loop.run_forever()

async def do_some_work(x):
  print('Waiting {}'.format(x))
  await asyncio.sleep(x)
  print('Done after {}s'.format(x))

def more_work(x):
  print('More work {}'.format(x))
  time.sleep(x)
  print('Finished more work {}'.format(x))

start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)

上述的例子,主線程中創建一個new_loop,然后在另外的子線程中開啟一個無限事件循環。 主線程通過run_coroutine_threadsafe新注冊協程對象。這樣就能在子線程中進行事件循環的并發操作,同時主線程又不會被block。一共執行的時間大概在6s左右。

看完上述內容,你們對如何在python中使用asyncio模塊有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

开化县| 南昌县| 江西省| 都匀市| 土默特右旗| 邹平县| 汪清县| 剑川县| 钟祥市| 宽甸| 泰兴市| 东乡族自治县| 鲁甸县| 长春市| 松滋市| 南郑县| 朝阳区| 寿宁县| 南靖县| 武宣县| 古蔺县| 陆良县| 曲周县| 金坛市| 东乌珠穆沁旗| 织金县| 名山县| 孝感市| 泸定县| 宕昌县| 钦州市| 汶上县| 柏乡县| 清镇市| 庄河市| 关岭| 海南省| 鄂托克旗| 藁城市| 三明市| 甘洛县|