您好,登錄后才能下訂單哦!
一、介紹
什么是并發?
并發的本質就是切換+保存狀態
cpu正在運行一個任務,會在兩種情況下切走去執行其他的任務(切換由操作系統強制控制):
1.任務發生阻塞
2.計算任務時間過長,需要讓出cpu給高優先級的程序
協程,又稱微線程,是一種用戶態的輕量級線程。協程能保留上一次調用時的狀態,每次過程重入時,就相當于進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置,當程序中存在大量不需要CPU的操作時(IO),適用于協程。
協程本質上就是一個線程,以前線程任務的切換是由操作系統控制的,遇到I/O自動切換,現在我們用協程的目的就是較少操作系統切換的開銷(開關線程,創建寄存器、堆棧等,在他們之間進行切換等),在我們自己的程序里面來控制任務的切換
進程有三種狀態,而線程是進程的執行最小單位,所以也是線程的三種狀態
二、協程切換
1.yield是一種在單線程下可以保存任務運行狀態的方法
1. yiled 可以保存狀態,yield的狀態保存與操作系統的保存線程狀態很像,但是yield是代碼級別控制的,更輕量級
2. send 可以把一個函數的結果傳給另外一個函數,以此實現單線程內程序之間的切換
通過yield實現任務切換+保存線程:
import time
def func1():
for i in range(11):
print('func1第%s次打印' % i)
time.sleep(1)
def func2():
g = func1()
for k in range(10):
print('func2第%s次打印' % k)
time.sleep(1)
順序打印func1,func2
func2()
yield切換
import time
def func1():
for i in range(11):
yield
print('func1第%s次打印' % i)
time.sleep(1)
def func2():
g = func1()
next(g)
for k in range(10):
print('func2第%s次打印' % k)
time.sleep(1)
func2()
只打印func2,yield會保存func1的狀態,io阻塞
def consumer():
'''任務1:接收數據,處理數據'''
while True:
x=yield # 只是進行切換,并沒有節省I/O時間
print('處理了數據:',x)
def producer():
'''任務2:生產數據'''
g=consumer()
next(g) # 找到yield位置
for i in range(3):
g.send(i) # 給yield傳值,然后再循環給下一個yield傳值,并且多了切換的程序,比直接串行執行還多了一些步驟,導致執行效率反而更低了
print('發送了數據:',i)
start=time.time()
#基于yield保存狀態,實現兩個任務直接來回切換,即并發的效果
producer() #我在當前線程中只執行了這個函數,但是通過這個函數里面的send切換了另外一個任務
stop=time.time()
print(stop-start)
result:
處理了數據: 0
發送了數據: 0
處理了數據: 1
發送了數據: 1
處理了數據: 2
發送了數據: 2
沒有I/O,單純切換任務,會降低程序性能
注: yield并不能檢測io,實現自動切換
import time
def func1():
while True:
print('func1')
yield
def func2():
g = func1()
for i in range(1000):
#i + 1
next(g)
time.sleep(3)
print('func2')
start = time.time()
func2()
stop = time.time()
print(stop - start)
因為func2方法time.sleep 阻塞,會切換到func1執行
協程就是告訴Cpython解釋器,不是搞了個GIL鎖嗎,那好,我就自己搞成一個線程讓你去執行,省去你切換線程的時間,我自己切換比你切換要快很多,避免了很多的開銷。
對于單線程下,我們不可避免程序中出現io操作,但如果我們能在自己的程序中(即用戶程序級別,而非操作系統級別)控制單線程下的多個任務能在一個任務遇到io阻塞時就切換到另外一個任務去計算,這樣就保證了該線程能夠最大限度地處于就緒態,即隨時都可以被cpu執行的狀態,相當于我們在用戶程序級別將自己的io操作最大限度地隱藏起來,從而可以迷惑操作系統,讓其看到:該線程好像是一直在計算,io比較少,從而更多的將cpu的執行權限分配給我們的線程
以上內容從其他文章粘貼
三、線程、協程對比
1.python的線程屬于內核級別的,即由操作系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其他線程運行)
2.單線程內開啟協程,一旦遇到io,就會從應用程序級別(而非操作系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)
對比操作系統控制線程的切換,用戶在單線程內控制協程的切換
優點:
缺點:
1.協程屬于單線程,無法利用多核優勢,可以用多進程+多線程+協程實現
2.協程也是單線程下運行,一旦阻塞,將阻塞整個線程
協程特點:
1.單線程下運行實現并發
2.修改數據不需要加鎖(線程需要)
3.用戶程序控制上下文切換
4.附加:一個協程遇到IO操作自動切換到其它協程(如何實現檢測IO,yield、greenlet都無法實現,就用到了gevent模塊(select機制))
四、Greenlet
如果我們在單個線程內有多個任務,要想實現在多個任務之間切換,使用yield生成器的方式過于麻煩(需要先得到初始化一次的生成器,然后再調用send。。。非常麻煩),而使用greenlet模塊可以非常簡單地實現多個任務直接的切換
pip3 install greenlet
from greenlet import greenlet
def eat(name):
print('%s eat 1' % name)
g2.switch('上海')
print('%s eat 2' % name)
g2.switch()
def play(name):
print('%s play 1' % name)
g1.switch()
print('%s play 2' % name)
g1 = greenlet(eat)
g2 = greenlet(play)
g1.switch('beijing') # 第一次需要傳參,以后都不需要
單純的切換(在沒有io的情況下或者沒有重復開辟內存空間的操作),反而會降低程序的執行速度
#順序執行
import time
def f1():
res=1
for i in range(100000000):
res+=i
def f2():
res=1
for i in range(100000000):
res*=i
start=time.time()
f1()
f2()
stop=time.time()
print('run time is %s' %(stop-start)) # 8.795756101608276
#切換
from greenlet import greenlet
import time
def f1():
res=1
for i in range(100000000):
res+=i
g2.switch()
def f2():
res=1
for i in range(100000000):
res*=i
g1.switch()
start=time.time()
g1=greenlet(f1)
g2=greenlet(f2)
g1.switch()
stop=time.time()
print('run time is %s' %(stop-start)) # 45.937793016433716
greenlet只是提供了一種比generator(yield)更加便捷的切換方式,當切到一個任務執行時如果遇到IO,那就原地阻塞(不能識別io),仍然是沒有解決遇到IO自動切換來提升效率的問題
單線程里的多個任務的代碼通常既有計算操作又有阻塞操作,我們完全可以在執行任務1時遇到阻塞,就利用阻塞的時間去執行任務2。。。如此,才能提高效率,這就用到了Gevent模塊
五、Gevent
Gevent是一個第三方庫,可以輕松通過gevent實現并發同步或異步編程,在gevent中用到的主要模式是Greenlet,它是以C擴展模塊形式接入Python的輕量級協程。Greenlet全部運行在主程序操作系統進程的內部,但他們被協作式地調度
安裝:
pip3 install gevent
用法:
g1=gevent.spawn(func,1,2,3,x=4,y=5)
#創建一個協程對象g1,spawn括號內第一個參數是函數名,如eat,后面可以有多個參數,可以是位置實參或關鍵字實參,都是傳給函數eat的,spawn是異步提交任務
g2=gevent.spawn(func2)
g1.join() #等待g1結束
g2.join() #等待g2結束 有人測試的時候會發現,不寫第二個join也能執行g2,是的,協程幫你切換執行了,但是你會發現,如果g2里面的任務執行的時間長,但是不寫join的話,就不會執行完等到g2剩下的任務了
#或者上述兩步合作一步:
gevent.joinall([g1,g2])
g1.value #拿到func1的返回值
import gevent
def eat(name):
print('%s eat 1' % name)
gevent.sleep(2)
print('%s eat 2' % name)
def play(name):
print('%s play 1' % name)
gevent.sleep(1)
print('%s play 2' % name)
g1 = gevent.spawn(eat, 'xxx')
g2 = gevent.spawn(play, name='xxx')
g1.join()
g2.join()
#或者gevent.joinall([g1,g2])
print('over')
上例gevent.sleep(2)模擬的是gevent可以識別的io阻塞;
而time.sleep(2)或其他的阻塞,gevent是不能直接識別的需要用下面一行代碼,打補丁,就可以識別了
from gevent import monkey;
monkey.patch_all() #必須放到被打補丁者的前面,如time,socket模塊之前
from gevent import monkey
monkey.patch_all() # 必須寫在最上面,否則可能識別不了io
import gevent
import time
def eat():
# print()
print('eat food 1')
time.sleep(2) # 加上monkey就能夠識別到time模塊的sleep了
print('eat food 2')
def play():
print('play 1')
time.sleep(1) # 來回切換,直到一個I/O的時間結束,這里都是我們個gevent做得,不再是控制不了的操作系統了。
print('play 2')
g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
gevent.joinall([g1,g2])
print('over')
六、同步、異步
from gevent import spawn, joinall,monkey
monkey.patch_all()
import time
def task(pid):
time.sleep(0.5)
print('Task %s done' % pid)
def sync():
for i in range(10):
task(i)
def asyncous():
g_list = [spawn(task,i) for i in range(10)]
joinall(g_list)
if __name__ == '__main__':
print('sync')
sync()
# 對比發現執行速度
print('async')
asyncous()
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。