您好,登錄后才能下訂單哦!
這篇文章主要介紹Python并發編程的示例分析,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
0x01 multipleprocessing
與使用線程的 threading 模塊類似, multipleprocessing
模塊提供許多高級 API 。最常見的是 Pool 對象了,使用它的接口能很方便地寫出并發執行的代碼。
from multiprocessing import Pool def f(x): return x * x if __name__ == '__main__': with Pool(5) as p: # map方法的作用是將f()方法并發地映射到列表中的每個元素 print(p.map(f, [1, 2, 3])) # 執行結果 # [1, 4, 9]
關于 Pool 下文中還會提到,這里我們先來看 Process 。
Process
要創建一個進程可以使用 Process 類,使用 start() 方法啟動進程。
from multiprocessing import Process import os def echo(text): # 父進程ID print("Process Parent ID : ", os.getppid()) # 進程ID print("Process PID : ", os.getpid()) print('echo : ', text) if __name__ == '__main__': p = Process(target=echo, args=('hello process',)) p.start() p.join() # 執行結果 # Process Parent ID : 27382 # Process PID : 27383 # echo : hello process
進程池
正如開篇提到的 multiprocessing
模塊提供了 Pool 類可以很方便地實現一些簡單多進程場景。 它主要有以下接口
apply(func[, args[, kwds]])
執行 func(args,kwds) 方法,在方法結束返回前會阻塞。
apply_async(func[, args[, kwds[, callback[, error_callback]]]])
異步執行 func(args,kwds) ,會立即返回一個 result 對象,如果指定了 callback 參數,結果會通過回調方法返回,還可以指定執行出錯的回調方法 error_callback()
map(func, iterable[, chunksize])
類似內置函數 map() ,可以并發執行 func ,是同步方法
map_async(func, iterable[, chunksize[, callback[, error_callback]]])
異步版本的 map
close()
關閉進程池。當池中的所有工作進程都執行完畢時,進程會退出。
terminate()
終止進程池
join()
等待工作進程執行完,必需先調用 close() 或者 terminate()
from multiprocessing import Pool def f(x): return x * x if __name__ == '__main__': with Pool(5) as p: # map方法的作用是將f()方法并發地映射到列表中的每個元素 a = p.map(f, [1, 2, 3]) print(a) # 異步執行map b = p.map_async(f, [3, 5, 7, 11]) # b 是一個result對象,代表方法的執行結果 print(b) # 為了拿到結果,使用join方法等待池中工作進程退出 p.close() # 調用join方法前,需先執行close或terminate方法 p.join() # 獲取執行結果 print(b.get()) # 執行結果 # [1, 4, 9] # <multiprocessing.pool.MapResult object at 0x10631b710> # [9, 25, 49, 121]
map_async() 和 apply_async() 執行后會返回一個 class multiprocessing.pool.AsyncResult 對象,通過它的 get() 可以獲取到執行結果, ready() 可以判斷 AsyncResult 的結果是否準備好。
進程間數據的傳輸
multiprocessing 模塊提供了兩種方式用于進程間的數據共享:隊列( Queue )和管道( Pipe )
Queue 是線程安全,也是進程安全的。使用 Queue 可以實現進程間的數據共享,例如下面的 demo 中子進程 put 一個對象,在主進程中就能 get 到這個對象。 任何可以序列化的對象都可以通過 Queue 來傳輸。
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': # 使用Queue進行數據通信 q = Queue() p = Process(target=f, args=(q,)) p.start() # 主進程取得子進程中的數據 print(q.get()) # prints "[42, None, 'hello']" p.join() # 執行結果 # [42, None, 'hello']
Pipe() 返回一對通過管道連接的 Connection 對象。這兩個對象可以理解為管道的兩端,它們通過 send() 和 recv() 發送和接收數據。
from multiprocessing import Process, Pipe def write(conn): # 子進程中發送一個對象 conn.send([42, None, 'hello']) conn.close() def read(conn): # 在讀的進程中通過recv接收對象 data = conn.recv() print(data) if __name__ == '__main__': # Pipe()方法返回一對連接對象 w_conn, r_conn = Pipe() wp = Process(target=write, args=(w_conn,)) rp = Process(target=read, args=(r_conn,)) wp.start() rp.start() # 執行結果 # [42, None, 'hello']
需要注意的是,兩個進程不能同時對一個連接對象進行 send 或 recv 操作。
同步
我們知道線程間的同步是通過鎖機制來實現的,進程也一樣。
from multiprocessing import Process, Lock import time def print_with_lock(l, i): l.acquire() try: time.sleep(1) print('hello world', i) finally: l.release() def print_without_lock(i): time.sleep(1) print('hello world', i) if __name__ == '__main__': lock = Lock() # 先執行有鎖的 for num in range(5): Process(target=print_with_lock, args=(lock, num)).start() # 再執行無鎖的 # for num in range(5): # Process(target=print_without_lock, args=(num,)).start()
有鎖的代碼將每秒依次打印
hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
如果執行無鎖的代碼,則在我的電腦上執行結果是這樣的
hello worldhello world 0
1
hello world 2
hello world 3
hello world 4
除了 Lock ,還包括 RLock 、 Condition 、 Semaphore 和 Event 等進程間的同步原語。其用法也與線程間的同步原語很類似。 API 使用可以參考文末中引用的文檔鏈接。
在工程中實現進程間的數據共享應當優先使用 隊列或管道。
以上是“Python并發編程的示例分析”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。